Skip to content

Commit

Permalink
HBASE-26125 Backport HBASE-25401 "Add trace support for async call in…
Browse files Browse the repository at this point in the history
… rpc client" to branch-2 (apache#3543)

2/17 commits of HBASE-22120

Co-authored-by: Duo Zhang <zhangduo@apache.org>

Signed-off-by: Peter Somogyi <psomogyi@apache.org>
  • Loading branch information
taklwu authored Aug 2, 2021
1 parent da9bcb6 commit ca09643
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collection;
Expand All @@ -38,6 +41,7 @@
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
Expand Down Expand Up @@ -365,7 +369,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
protected abstract T createConnection(ConnectionId remoteId) throws IOException;

private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
RpcCallback<Message> callback) {
RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
metrics.updateRpc(call.md, call.param, call.callStats);
Expand All @@ -388,44 +392,59 @@ private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
}
}

Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket,
final Address addr, final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());

if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
int numActions = 0;
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
numActions += regionAction.getActionCount();
}
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
.startSpan();
try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());

if (param instanceof ClientProtos.MultiRequest) {
ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
int numActions = 0;
for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
numActions += regionAction.getActionCount();
}

cs.setNumActionsPerServer(numActions);
}
cs.setNumActionsPerServer(numActions);
}

final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
try (Scope scope = call.span.makeCurrent()) {
counter.decrementAndGet();
onCallFinished(call, hrc, addr, callback);
} finally {
if (hrc.failed()) {
span.setStatus(StatusCode.ERROR);
span.recordException(hrc.getFailed());
} else {
span.setStatus(StatusCode.OK);
}
span.end();
}
}
}, cs);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
int count = counter.incrementAndGet();
try {
if (count > maxConcurrentCallsPerServer) {
throw new ServerTooBusyException(addr, count);
}
cs.setConcurrentCallsPerServer(count);
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
call.setException(toIOE(e));
span.end();
}
cs.setConcurrentCallsPerServer(count);
T connection = getConnection(remoteId);
connection.sendRequest(call, hrc);
} catch (Exception e) {
call.setException(toIOE(e));
return call;
}
return call;
}

private static Address createAddr(ServerName sn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
Expand Down Expand Up @@ -57,7 +55,6 @@
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.io.IOUtils;
Expand Down Expand Up @@ -192,8 +189,8 @@ public void run() {
if (call.isDone()) {
continue;
}
try {
tracedWriteRequest(call);
try (Scope scope = call.span.makeCurrent()) {
writeRequest(call);
} catch (IOException e) {
// exception here means the call has not been added to the pendingCalls yet, so we need
// to fail it by our own.
Expand Down Expand Up @@ -594,16 +591,6 @@ private void negotiateCryptoAes(RPCProtos.CryptoCipherMeta cryptoCipherMeta)
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
}

private void tracedWriteRequest(Call call) throws IOException {
Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
.setParent(Context.current().with(call.span)).startSpan();
try (Scope scope = span.makeCurrent()) {
writeRequest(call);
} finally {
span.end();
}
}

/**
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
* the Connection thread, but by other threads.
Expand Down Expand Up @@ -811,7 +798,9 @@ public void run(boolean cancelled) throws IOException {
if (callSender != null) {
callSender.sendCall(call);
} else {
tracedWriteRequest(call);
// this is in the same thread with the caller so do not need to attach the trace context
// again.
writeRequest(call);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class Call {
final Span span;
Timeout timeoutTask;

protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -49,6 +51,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;

/**
* Utility to help ipc'ing.
Expand Down Expand Up @@ -112,11 +115,10 @@ public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id);
//TODO handle htrace API change, see HBASE-18895
/*if (call.span != null) {
builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
.setTraceId(call.span.getTracerId()));
}*/
RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
builder.setTraceInfo(traceBuilder.build());
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
if (cellBlockMeta != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -114,9 +115,12 @@ private void writeRequest(ChannelHandlerContext ctx, Call call, ChannelPromise p

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
throws Exception {
if (msg instanceof Call) {
writeRequest(ctx, (Call) msg, promise);
Call call = (Call) msg;
try (Scope scope = call.span.makeCurrent()) {
writeRequest(ctx, call, promise);
}
} else {
ctx.write(msg, promise);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.trace;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -30,6 +30,6 @@ private TraceUtil() {
}

public static Tracer getGlobalTracer() {
return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
}
}
14 changes: 7 additions & 7 deletions hbase-protocol-shaded/src/main/protobuf/Tracing.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

//Used to pass through the information necessary to continue
//a trace after an RPC is made. All we need is the traceid
//(so we know the overarching trace this message is a part of), and
//the id of the current span when this message was sent, so we know
//what span caused the new span we will create when this message is received.
// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
// is a text-based approach that passes properties with http headers. Here we will also use this
// approach so we just need a map to store the key value pair.

message RPCTInfo {
optional int64 trace_id = 1;
optional int64 parent_id = 2;
optional int64 trace_id = 1 [deprecated = true];
optional int64 parent_id = 2 [deprecated = true];
map<string, string> headers = 3;
}
10 changes: 10 additions & 0 deletions hbase-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,16 @@
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
Expand Down Expand Up @@ -73,15 +75,6 @@ public RpcCall getRpcCall() {
return call;
}

/**
* Keep for backward compatibility.
* @deprecated As of release 2.0, this will be removed in HBase 3.0
*/
@Deprecated
public ServerCall<?> getCall() {
return (ServerCall<?>) call;
}

public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
Expand Down Expand Up @@ -130,7 +123,8 @@ public void run() {
String serviceName = getServiceName();
String methodName = getMethodName();
String traceString = serviceName + "." + methodName;
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
.setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
Expand All @@ -141,8 +135,12 @@ public void run() {
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
span.recordException(e);
span.setStatus(StatusCode.ERROR);
return;
} catch (Throwable e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
if (e instanceof ServerNotRunningYetException) {
// If ServerNotRunningYetException, don't spew stack trace.
if (RpcServer.LOG.isTraceEnabled()) {
Expand All @@ -161,6 +159,7 @@ public void run() {
RpcServer.CurCall.set(null);
if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1);
span.setStatus(StatusCode.OK);
sucessful = true;
}
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -102,6 +104,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// from WAL side on release
private final AtomicInteger reference = new AtomicInteger(0x80000000);

private final Span span;

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
Expand Down Expand Up @@ -132,6 +136,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
this.span = Span.current();
}

/**
Expand All @@ -150,6 +155,7 @@ public void done() {
// If the call was run successfuly, we might have already returned the BB
// back to pool. No worries..Then inputCellBlock will be null
cleanup();
span.end();
}

@Override
Expand Down Expand Up @@ -226,6 +232,10 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab
}
if (t != null) {
this.isError = true;
span.recordException(t);
span.setStatus(StatusCode.ERROR);
} else {
span.setStatus(StatusCode.OK);
}
BufferChain bc = null;
try {
Expand Down Expand Up @@ -560,4 +570,8 @@ public synchronized BufferChain getResponse() {
return response;
}
}

public Span getSpan() {
return span;
}
}
Loading

0 comments on commit ca09643

Please sign in to comment.