Skip to content

Commit

Permalink
HBASE-26473 Introduce db.hbase.container_operations span attribute
Browse files Browse the repository at this point in the history
For batch operations, collect and annotate the associated span with the set of all operations
contained in the batch.

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
ndimiduk committed Jan 27, 2022
1 parent a8b68c9 commit 4748707
Show file tree
Hide file tree
Showing 7 changed files with 210 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -406,7 +407,8 @@ protected Result rpcCall() throws Exception {
public Result[] get(List<Get> gets) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(gets);
return TraceUtil.trace(() -> {
if (gets.size() == 1) {
return new Result[] { get(gets.get(0)) };
Expand All @@ -433,7 +435,8 @@ public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions);
TraceUtil.traceWithIOException(() -> {
int rpcTimeout = writeRpcTimeoutMs;
boolean hasRead = false;
Expand Down Expand Up @@ -473,6 +476,7 @@ public void batch(final List<? extends Row> actions, final Object[] results, int
final Span span = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions)
.build();
try (Scope ignored = span.makeCurrent()) {
AsyncRequestFuture ars = multiAp.submit(task);
Expand All @@ -481,6 +485,7 @@ public void batch(final List<? extends Row> actions, final Object[] results, int
TraceUtil.setError(span, ars.getErrors());
throw ars.getErrors();
}
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
Expand Down Expand Up @@ -512,6 +517,7 @@ public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[]
final Span span = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions)
.build();
try (Scope ignored = span.makeCurrent()) {
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
Expand Down Expand Up @@ -551,7 +557,8 @@ protected Void rpcCall() throws Exception {
public void delete(final List<Delete> deletes) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(deletes);
TraceUtil.traceWithIOException(() -> {
Object[] results = new Object[deletes.size()];
try {
Expand Down Expand Up @@ -600,7 +607,8 @@ protected Void rpcCall() throws Exception {
public void put(final List<Put> puts) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(puts);
TraceUtil.traceWithIOException(() -> {
for (Put put : puts) {
validatePut(put);
Expand All @@ -618,7 +626,8 @@ public void put(final List<Put> puts) throws IOException {
public Result mutateRow(final RowMutations rm) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(rm);
return TraceUtil.trace(() -> {
long nonceGroup = getNonceGroup();
long nonce = getNonce();
Expand Down Expand Up @@ -773,7 +782,8 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final byte [] value, final Put put) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
.isSuccess(),
Expand All @@ -786,7 +796,8 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, put).isSuccess(),
Expand All @@ -799,7 +810,8 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOperator op, final byte [] value, final Put put) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
supplier);
Expand All @@ -811,7 +823,8 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
final byte[] value, final Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
delete).isSuccess(),
Expand All @@ -824,7 +837,8 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, delete).isSuccess(),
Expand All @@ -837,7 +851,8 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
supplier);
Expand Down Expand Up @@ -914,7 +929,8 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(rm);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, rm).isSuccess(),
Expand All @@ -927,7 +943,8 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(rm);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
supplier);
Expand All @@ -937,7 +954,8 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate);
return TraceUtil.trace(() -> {
Row action = checkAndMutate.getAction();
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
Expand Down Expand Up @@ -986,7 +1004,8 @@ public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMu
throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(checkAndMutates);
return TraceUtil.trace(() -> {
if (checkAndMutates.isEmpty()) {
return Collections.emptyList();
Expand Down Expand Up @@ -1056,7 +1075,8 @@ public boolean exists(final Get get) throws IOException {
public boolean[] exists(List<Get> gets) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(gets);
return TraceUtil.trace(() -> {
if (gets.isEmpty()) {
return new boolean[] {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
Expand All @@ -401,7 +402,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -417,7 +419,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
preCheck();
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -460,7 +463,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
Expand All @@ -475,7 +479,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -490,7 +495,8 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand All @@ -512,7 +518,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
Expand Down Expand Up @@ -565,7 +572,8 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates);
.setOperation(checkAndMutates)
.setContainerOperations(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
Expand Down Expand Up @@ -621,7 +629,8 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations);
.setOperation(mutations)
.setContainerOperations(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
Expand Down Expand Up @@ -694,28 +703,32 @@ public void onComplete() {
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets);
.setOperation(gets)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts);
.setOperation(puts)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes);
.setOperation(deletes)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions);
.setOperation(actions)
.setContainerOperations(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

Expand Down
Loading

0 comments on commit 4748707

Please sign in to comment.