Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-26473 Introduce db.hbase.container_operations span attribute" to branch-2 #4083

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -406,7 +407,8 @@ protected Result rpcCall() throws Exception {
public Result[] get(List<Get> gets) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(gets);
return TraceUtil.trace(() -> {
if (gets.size() == 1) {
return new Result[] { get(gets.get(0)) };
Expand All @@ -433,7 +435,8 @@ public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions);
TraceUtil.traceWithIOException(() -> {
int rpcTimeout = writeRpcTimeoutMs;
boolean hasRead = false;
Expand Down Expand Up @@ -473,6 +476,7 @@ public void batch(final List<? extends Row> actions, final Object[] results, int
final Span span = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions)
.build();
try (Scope ignored = span.makeCurrent()) {
AsyncRequestFuture ars = multiAp.submit(task);
Expand All @@ -481,6 +485,7 @@ public void batch(final List<? extends Row> actions, final Object[] results, int
TraceUtil.setError(span, ars.getErrors());
throw ars.getErrors();
}
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
Expand Down Expand Up @@ -512,6 +517,7 @@ public static <R> void doBatchWithCallback(List<? extends Row> actions, Object[]
final Span span = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(actions)
.build();
try (Scope ignored = span.makeCurrent()) {
AsyncRequestFuture ars = connection.getAsyncProcess().submit(task);
Expand Down Expand Up @@ -551,7 +557,8 @@ protected Void rpcCall() throws Exception {
public void delete(final List<Delete> deletes) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(deletes);
TraceUtil.traceWithIOException(() -> {
Object[] results = new Object[deletes.size()];
try {
Expand Down Expand Up @@ -600,7 +607,8 @@ protected Void rpcCall() throws Exception {
public void put(final List<Put> puts) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(puts);
TraceUtil.traceWithIOException(() -> {
for (Put put : puts) {
validatePut(put);
Expand All @@ -618,7 +626,8 @@ public void put(final List<Put> puts) throws IOException {
public Result mutateRow(final RowMutations rm) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(rm);
return TraceUtil.trace(() -> {
long nonceGroup = getNonceGroup();
long nonce = getNonce();
Expand Down Expand Up @@ -773,7 +782,8 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final byte [] value, final Put put) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null, put)
.isSuccess(),
Expand All @@ -786,7 +796,8 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, put).isSuccess(),
Expand All @@ -799,7 +810,8 @@ public boolean checkAndPut(final byte [] row, final byte [] family, final byte [
final CompareOperator op, final byte [] value, final Put put) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.PUT);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, put).isSuccess(),
supplier);
Expand All @@ -811,7 +823,8 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
final byte[] value, final Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, CompareOperator.EQUAL, value, null, null,
delete).isSuccess(),
Expand All @@ -824,7 +837,8 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, delete).isSuccess(),
Expand All @@ -837,7 +851,8 @@ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE, HBaseSemanticAttributes.Operation.DELETE);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, delete).isSuccess(),
supplier);
Expand Down Expand Up @@ -914,7 +929,8 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(rm);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, toCompareOperator(compareOp), value, null,
null, rm).isSuccess(),
Expand All @@ -927,7 +943,8 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(rm);
return TraceUtil.trace(
() -> doCheckAndMutate(row, family, qualifier, op, value, null, null, rm).isSuccess(),
supplier);
Expand All @@ -937,7 +954,8 @@ public boolean checkAndMutate(final byte [] row, final byte [] family, final byt
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate);
return TraceUtil.trace(() -> {
Row action = checkAndMutate.getAction();
if (action instanceof Put || action instanceof Delete || action instanceof Increment ||
Expand Down Expand Up @@ -986,7 +1004,8 @@ public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMu
throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(checkAndMutates);
return TraceUtil.trace(() -> {
if (checkAndMutates.isEmpty()) {
return Collections.emptyList();
Expand Down Expand Up @@ -1056,7 +1075,8 @@ public boolean exists(final Get get) throws IOException {
public boolean[] exists(List<Get> gets) throws IOException {
final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.setContainerOperations(gets);
return TraceUtil.trace(() -> {
if (gets.isEmpty()) {
return new boolean[] {};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, put,
Expand All @@ -401,7 +402,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
public CompletableFuture<Boolean> thenDelete(Delete delete) {
preCheck();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -417,7 +419,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
preCheck();
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand Down Expand Up @@ -460,7 +463,8 @@ public CheckAndMutateWithFilterBuilder timeRange(TimeRange timeRange) {
public CompletableFuture<Boolean> thenPut(Put put) {
validatePut(put, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(put);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, put.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc,
Expand All @@ -475,7 +479,8 @@ public CompletableFuture<Boolean> thenPut(Put put) {
@Override
public CompletableFuture<Boolean> thenDelete(Delete delete) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(delete);
return tracedFuture(
() -> RawAsyncTableImpl.this.<Boolean> newCaller(row, delete.getPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.mutate(controller, loc, stub, delete,
Expand All @@ -490,7 +495,8 @@ public CompletableFuture<Boolean> thenDelete(Delete delete) {
public CompletableFuture<Boolean> thenMutate(RowMutations mutations) {
validatePutsInRowMutations(mutations, conn.connConf.getMaxKeyValueSize());
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE)
.setContainerOperations(mutations);
return tracedFuture(
() -> RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutations.getMaxPriority(), rpcTimeoutNs)
Expand All @@ -512,7 +518,8 @@ public CheckAndMutateWithFilterBuilder checkAndMutate(byte[] row, Filter filter)
@Override
public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate checkAndMutate) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutate);
.setOperation(checkAndMutate)
.setContainerOperations(checkAndMutate.getAction());
return tracedFuture(() -> {
if (checkAndMutate.getAction() instanceof Put ||
checkAndMutate.getAction() instanceof Delete ||
Expand Down Expand Up @@ -565,7 +572,8 @@ public CompletableFuture<CheckAndMutateResult> checkAndMutate(CheckAndMutate che
public List<CompletableFuture<CheckAndMutateResult>>
checkAndMutate(List<CheckAndMutate> checkAndMutates) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(checkAndMutates);
.setOperation(checkAndMutates)
.setContainerOperations(checkAndMutates);
return tracedFutures(
() -> batch(checkAndMutates, rpcTimeoutNs).stream()
.map(f -> f.thenApply(r -> (CheckAndMutateResult) r)).collect(toList()),
Expand Down Expand Up @@ -621,7 +629,8 @@ public CompletableFuture<Result> mutateRow(RowMutations mutations) {
long nonceGroup = conn.getNonceGenerator().getNonceGroup();
long nonce = conn.getNonceGenerator().newNonce();
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(mutations);
.setOperation(mutations)
.setContainerOperations(mutations);
return tracedFuture(
() -> this
.<Result> newCaller(mutations.getRow(), mutations.getMaxPriority(), writeRpcTimeoutNs)
Expand Down Expand Up @@ -694,28 +703,32 @@ public void onComplete() {
@Override
public List<CompletableFuture<Result>> get(List<Get> gets) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(gets);
.setOperation(gets)
.setContainerOperations(HBaseSemanticAttributes.Operation.GET);
return tracedFutures(() -> batch(gets, readRpcTimeoutNs), supplier);
}

@Override
public List<CompletableFuture<Void>> put(List<Put> puts) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(puts);
.setOperation(puts)
.setContainerOperations(HBaseSemanticAttributes.Operation.PUT);
return tracedFutures(() -> voidMutate(puts), supplier);
}

@Override
public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(deletes);
.setOperation(deletes)
.setContainerOperations(HBaseSemanticAttributes.Operation.DELETE);
return tracedFutures(() -> voidMutate(deletes), supplier);
}

@Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
final Supplier<Span> supplier = newTableOperationSpanBuilder()
.setOperation(actions);
.setOperation(actions)
.setContainerOperations(actions);
return tracedFutures(() -> batch(actions, rpcTimeoutNs), supplier);
}

Expand Down
Loading