Skip to content

Commit

Permalink
HBASE-26764 Implement generic exception support for TraceUtil methods…
Browse files Browse the repository at this point in the history
… over Callables and Runnables

For the `TraceUtil` methods that accept `Callable` and `Runnable` types, make them generic over a
child of `Throwable`. This allows us to consolidate the two method signatures into a single more
flexible definition.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
ndimiduk committed Mar 3, 2022
1 parent dbaa68a commit 9c037b1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import io.opentelemetry.api.trace.Span;
import java.io.IOException;
import java.net.SocketAddress;
Expand All @@ -40,7 +39,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
Expand All @@ -61,10 +59,8 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;

import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
Expand Down Expand Up @@ -444,13 +440,7 @@ public CompletableFuture<Hbck> getHbck() {

@Override
public Hbck getHbck(ServerName masterServer) {
return TraceUtil.trace(new Supplier<Hbck>() {

@Override
public Hbck get() {
return getHbckInternal(masterServer);
}
}, "AsyncConnection.getHbck");
return TraceUtil.trace(() -> getHbckInternal(masterServer), "AsyncConnection.getHbck");
}

Optional<MetricsConnection> getConnectionMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.Version;
Expand Down Expand Up @@ -84,7 +84,7 @@ public static <T> CompletableFuture<T> tracedFuture(
Supplier<Span> spanSupplier
) {
Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
try (Scope ignored = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
endSpan(future, span);
return future;
Expand All @@ -97,7 +97,7 @@ public static <T> CompletableFuture<T> tracedFuture(
public static <T> CompletableFuture<T> tracedFuture(Supplier<CompletableFuture<T>> action,
String spanName) {
Span span = createSpan(spanName);
try (Scope scope = span.makeCurrent()) {
try (Scope ignored = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
endSpan(future, span);
return future;
Expand All @@ -113,7 +113,7 @@ public static <T> List<CompletableFuture<T>> tracedFutures(
Supplier<Span> spanSupplier
) {
Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
try (Scope ignored = span.makeCurrent()) {
List<CompletableFuture<T>> futures = action.get();
endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
return futures;
Expand All @@ -139,29 +139,29 @@ private static void endSpan(CompletableFuture<?> future, Span span) {
});
}

public static void trace(Runnable action, String spanName) {
trace(action, () -> createSpan(spanName));
/**
* A {@link Runnable} that may also throw.
* @param <T> the type of {@link Throwable} that can be produced.
*/
@FunctionalInterface
public interface ThrowingRunnable<T extends Throwable> {
void run() throws T;
}

public static void trace(Runnable action, Supplier<Span> creator) {
Span span = creator.get();
try (Scope scope = span.makeCurrent()) {
action.run();
span.setStatus(StatusCode.OK);
} catch (Throwable e) {
setError(span, e);
throw e;
} finally {
span.end();
}
public static <T extends Throwable> void trace(
final ThrowingRunnable<T> runnable,
final String spanName) throws T {
trace(runnable, () -> createSpan(spanName));
}

public static <T> T trace(Supplier<T> action, String spanName) {
Span span = createSpan(spanName);
try (Scope scope = span.makeCurrent()) {
T ret = action.get();
public static <T extends Throwable> void trace(
final ThrowingRunnable<T> runnable,
final Supplier<Span> spanSupplier
) throws T {
Span span = spanSupplier.get();
try (Scope ignored = span.makeCurrent()) {
runnable.run();
span.setStatus(StatusCode.OK);
return ret;
} catch (Throwable e) {
setError(span, e);
throw e;
Expand All @@ -170,20 +170,30 @@ public static <T> T trace(Supplier<T> action, String spanName) {
}
}

/**
* A {@link Callable} that may also throw.
* @param <R> the result type of method call.
* @param <T> the type of {@link Throwable} that can be produced.
*/
@FunctionalInterface
public interface IOExceptionCallable<V> {
V call() throws IOException;
public interface ThrowingCallable<R, T extends Throwable> {
R call() throws T;
}

public static <T> T trace(IOExceptionCallable<T> callable, String spanName) throws IOException {
public static <R, T extends Throwable> R trace(
final ThrowingCallable<R, T> callable,
final String spanName
) throws T {
return trace(callable, () -> createSpan(spanName));
}

public static <T> T trace(IOExceptionCallable<T> callable, Supplier<Span> creator)
throws IOException {
Span span = creator.get();
try (Scope scope = span.makeCurrent()) {
T ret = callable.call();
public static <R, T extends Throwable> R trace(
final ThrowingCallable<R, T> callable,
final Supplier<Span> spanSupplier
) throws T {
Span span = spanSupplier.get();
try (Scope ignored = span.makeCurrent()) {
final R ret = callable.call();
span.setStatus(StatusCode.OK);
return ret;
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,18 +590,12 @@ public final void sync(long txid) throws IOException {

@Override
public final void sync(boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(forceSync);
return null;
}, () -> createSpan("WAL.sync"));
TraceUtil.trace(() -> doSync(forceSync), () -> createSpan("WAL.sync"));
}

@Override
public final void sync(long txid, boolean forceSync) throws IOException {
TraceUtil.trace(() -> {
doSync(txid, forceSync);
return null;
}, () -> createSpan("WAL.sync"));
TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
}

protected abstract void doSync(boolean forceSync) throws IOException;
Expand Down

0 comments on commit 9c037b1

Please sign in to comment.