Skip to content

Commit

Permalink
HBASE-25455 Add trace support for HRegion read/write operation (#2861)
Browse files Browse the repository at this point in the history
Signed-off-by: Guanghao Zhang <zghao@apache.org>
  • Loading branch information
Apache9 committed Mar 28, 2021
1 parent a0ecdf2 commit 3038c6a
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
if (error != null) {
span.recordException(error);
span.setStatus(StatusCode.ERROR);
TraceUtil.setError(span, error);
} else {
List<String> regionNames = getRegionNames.apply(resp);
if (!regionNames.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,7 @@ public void run(Call call) {
onCallFinished(call, hrc, addr, callback);
} finally {
if (hrc.failed()) {
span.setStatus(StatusCode.ERROR);
span.recordException(hrc.getFailed());
TraceUtil.setError(span, hrc.getFailed());
} else {
span.setStatus(StatusCode.OK);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
Expand Down Expand Up @@ -58,6 +59,9 @@ public final class TraceUtil {

public static final AttributeKey<Long> REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT;

public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
AttributeKey.booleanKey("db.hbase.rowlock.readlock");

private TraceUtil() {
}

Expand Down Expand Up @@ -139,14 +143,18 @@ public static <T> List<CompletableFuture<T>> tracedFutures(
}
}

public static void setError(Span span, Throwable error) {
span.recordException(error);
span.setStatus(StatusCode.ERROR);
}

/**
* Finish the {@code span} when the given {@code future} is completed.
*/
private static void endSpan(CompletableFuture<?> future, Span span) {
FutureUtils.addListener(future, (resp, error) -> {
if (error != null) {
span.recordException(error);
span.setStatus(StatusCode.ERROR);
setError(span, error);
} else {
span.setStatus(StatusCode.OK);
}
Expand All @@ -164,8 +172,32 @@ public static void trace(Runnable action, Supplier<Span> creator) {
action.run();
span.setStatus(StatusCode.OK);
} catch (Throwable e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
setError(span, e);
throw e;
} finally {
span.end();
}
}

@FunctionalInterface
public interface IOExceptionCallable<V> {
V call() throws IOException;
}

public static <T> T trace(IOExceptionCallable<T> callable, String spanName) throws IOException {
return trace(callable, () -> createSpan(spanName));
}

public static <T> T trace(IOExceptionCallable<T> callable, Supplier<Span> creator)
throws IOException {
Span span = creator.get();
try (Scope scope = span.makeCurrent()) {
T ret = callable.call();
span.setStatus(StatusCode.OK);
return ret;
} catch (Throwable e) {
setError(span, e);
throw e;
} finally {
span.end();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,10 @@ public void run() {
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
span.recordException(e);
span.setStatus(StatusCode.ERROR);
TraceUtil.setError(span, e);
return;
} catch (Throwable e) {
span.recordException(e);
span.setStatus(StatusCode.ERROR);
TraceUtil.setError(span, e);
if (e instanceof ServerNotRunningYetException) {
// If ServerNotRunningYetException, don't spew stack trace.
if (RpcServer.LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
Expand Down Expand Up @@ -232,8 +233,7 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab
}
if (t != null) {
this.isError = true;
span.recordException(t);
span.setStatus(StatusCode.ERROR);
TraceUtil.setError(span, t);
} else {
span.setStatus(StatusCode.OK);
}
Expand Down
Loading

0 comments on commit 3038c6a

Please sign in to comment.