Skip to content

Commit

Permalink
add connection and request headers to slowlog payload
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Jul 30, 2023
1 parent 0bbc8d1 commit 30de63c
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ option java_outer_classname = "TooSlowLog";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "client/Client.proto";

message SlowLogPayload {
Expand All @@ -49,6 +50,9 @@ message SlowLogPayload {
optional int64 block_bytes_scanned = 16;
optional Scan scan = 17;

repeated NameBytesPair connection_attribute = 18;
repeated NameBytesPair request_attribute = 19;

// SLOW_LOG is RPC call slow in nature whereas LARGE_LOG is RPC call quite large.
// Majority of times, slow logs are also large logs and hence, ALL is combination of
// both
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
.setProcessingTime(processingTime).setQueueTime(qTime)
.setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
.setResponseSize(responseSize).setBlockBytesScanned(blockBytesScanned)
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName);
.setServerClass(className).setStartTime(startTime).setType(type).setUserName(userName)
.addAllRequestAttribute(rpcCall.getHeader().getAttributeList())
.addAllConnectionAttribute(rpcCall.getConnectionHeader().getAttributeList());
if (slowLogParams != null && slowLogParams.getScan() != null) {
slowLogPayloadBuilder.setScan(slowLogParams.getScan());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
Expand All @@ -71,6 +72,20 @@ public class TestNamedQueueRecorder {
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);

private static final HBaseTestingUtil HBASE_TESTING_UTILITY = new HBaseTestingUtil();
private static final List<HBaseProtos.NameBytesPair> REQUEST_HEADERS =
ImmutableList.<HBaseProtos.NameBytesPair> builder()
.add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
.setValue(ByteString.copyFromUtf8("r")).build())
.add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
.setValue(ByteString.copyFromUtf8("h")).build())
.build();
private static final List<HBaseProtos.NameBytesPair> CONNECTION_HEADERS =
ImmutableList.<HBaseProtos.NameBytesPair> builder()
.add(HBaseProtos.NameBytesPair.newBuilder().setName("1")
.setValue(ByteString.copyFromUtf8("c")).build())
.add(HBaseProtos.NameBytesPair.newBuilder().setName("2")
.setValue(ByteString.copyFromUtf8("h")).build())
.build();

private NamedQueueRecorder namedQueueRecorder;

Expand Down Expand Up @@ -599,6 +614,54 @@ public void testOnlineSlowLogScanPayloadExplicitlyEnabled() throws Exception {
}));
}

@Test
public void testOnlineSlowLogRequestAttributes() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();

Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (slowLogPayload.isPresent() && !slowLogPayload.get().getRequestAttributeList().isEmpty()) {
return slowLogPayload.get().getRequestAttributeList().containsAll(REQUEST_HEADERS);
}
return false;
}));
}

@Test
public void testOnlineSlowLogConnectionAttributes() throws Exception {
Configuration conf = applySlowLogRecorderConf(1);
Constructor<NamedQueueRecorder> constructor =
NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
constructor.setAccessible(true);
namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(1).build();

Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
RpcLogDetails rpcLogDetails = getRpcLogDetailsOfScan();
namedQueueRecorder.addRecord(rpcLogDetails);
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000, () -> {
Optional<SlowLogPayload> slowLogPayload = getSlowLogPayloads(request).stream().findAny();
if (
slowLogPayload.isPresent() && !slowLogPayload.get().getConnectionAttributeList().isEmpty()
) {
return slowLogPayload.get().getConnectionAttributeList().containsAll(CONNECTION_HEADERS);
}
return false;
}));
}

static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className,
int forcedParamIndex) {
RpcCall rpcCall = getRpcCall(userName, forcedParamIndex);
Expand Down Expand Up @@ -691,12 +754,16 @@ public long getSize() {

@Override
public RPCProtos.RequestHeader getHeader() {
return null;
RPCProtos.RequestHeader.Builder builder = RPCProtos.RequestHeader.newBuilder();
REQUEST_HEADERS.forEach(builder::addAttribute);
return builder.build();
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
return null;
RPCProtos.ConnectionHeader.Builder builder = RPCProtos.ConnectionHeader.newBuilder();
CONNECTION_HEADERS.forEach(builder::addAttribute);
return builder.build();
}

@Override
Expand Down

0 comments on commit 30de63c

Please sign in to comment.