From a4c26768960e89a37789a3e857490c1376ef764c Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 13 Jan 2021 17:09:29 +0800 Subject: [PATCH 1/2] HBASE-25455 Add trace support for HRegion read/write operation --- .../hbase/client/AsyncRegionLocator.java | 3 +- .../hadoop/hbase/ipc/AbstractRpcClient.java | 3 +- .../apache/hadoop/hbase/trace/TraceUtil.java | 40 +++- .../apache/hadoop/hbase/ipc/CallRunner.java | 6 +- .../apache/hadoop/hbase/ipc/ServerCall.java | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 198 ++++++++++-------- .../hbase/regionserver/RegionScannerImpl.java | 109 +++++----- .../regionserver/TestAtomicOperation.java | 2 +- .../regionserver/TestHRegionTracing.java | 187 +++++++++++++++++ 9 files changed, 402 insertions(+), 150 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionTracing.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 450c32433a99..9219c95c15e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -103,8 +103,7 @@ private CompletableFuture tracedLocationFuture(Supplier future = action.get(); FutureUtils.addListener(future, (resp, error) -> { if (error != null) { - span.recordException(error); - span.setStatus(StatusCode.ERROR); + TraceUtil.setError(span, error); } else { List regionNames = getRegionNames.apply(resp); if (!regionNames.isEmpty()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index a1200ff99cd4..f214d9cade28 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -424,8 +424,7 @@ public void run(Call call) { onCallFinished(call, hrc, addr, callback); } finally { if (hrc.failed()) { - span.setStatus(StatusCode.ERROR); - span.recordException(hrc.getFailed()); + TraceUtil.setError(span, hrc.getFailed()); } else { span.setStatus(StatusCode.OK); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java index 8eb2399a5ecf..ea16df10425c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java @@ -26,6 +26,7 @@ import io.opentelemetry.api.trace.attributes.SemanticAttributes; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; +import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -58,6 +59,9 @@ public final class TraceUtil { public static final AttributeKey REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT; + public static final AttributeKey ROW_LOCK_READ_LOCK_KEY = + AttributeKey.booleanKey("db.hbase.rowlock.readlock"); + private TraceUtil() { } @@ -139,14 +143,18 @@ public static List> tracedFutures( } } + public static void setError(Span span, Throwable error) { + span.recordException(error); + span.setStatus(StatusCode.ERROR); + } + /** * Finish the {@code span} when the given {@code future} is completed. */ private static void endSpan(CompletableFuture future, Span span) { FutureUtils.addListener(future, (resp, error) -> { if (error != null) { - span.recordException(error); - span.setStatus(StatusCode.ERROR); + setError(span, error); } else { span.setStatus(StatusCode.OK); } @@ -164,8 +172,32 @@ public static void trace(Runnable action, Supplier creator) { action.run(); span.setStatus(StatusCode.OK); } catch (Throwable e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR); + setError(span, e); + throw e; + } finally { + span.end(); + } + } + + @FunctionalInterface + public interface IOExceptionCallable { + V call() throws IOException; + } + + public static T trace(IOExceptionCallable callable, String spanName) throws IOException { + return trace(callable, () -> createSpan(spanName)); + } + + public static T trace(IOExceptionCallable callable, Supplier creator) + throws IOException { + Span span = creator.get(); + try (Scope scope = span.makeCurrent()) { + T ret = callable.call(); + span.setStatus(StatusCode.OK); + return ret; + } catch (Throwable e) { + setError(span, e); + throw e; } finally { span.end(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 48eb28ef3d30..e57579fe10ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -135,12 +135,10 @@ public void run() { resultPair = this.rpcServer.call(call, this.status); } catch (TimeoutIOException e){ RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call); - span.recordException(e); - span.setStatus(StatusCode.ERROR); + TraceUtil.setError(span, e); return; } catch (Throwable e) { - span.recordException(e); - span.setStatus(StatusCode.ERROR); + TraceUtil.setError(span, e); if (e instanceof ServerNotRunningYetException) { // If ServerNotRunningYetException, don't spew stack trace. if (RpcServer.LOG.isTraceEnabled()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java index 268a6a10b862..8b1dfb1da190 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.io.ByteBufferListOutputStream; import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StringUtils; @@ -232,8 +233,7 @@ public synchronized void setResponse(Message m, final CellScanner cells, Throwab } if (t != null) { this.isError = true; - span.recordException(t); - span.setStatus(StatusCode.ERROR); + TraceUtil.setError(span, t); } else { span.setStatus(StatusCode.OK); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ba60103d3c0d..88ab3a173a62 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -23,6 +23,7 @@ import edu.umd.cs.findbugs.annotations.Nullable; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; import io.opentelemetry.context.Scope; import java.io.EOFException; import java.io.FileNotFoundException; @@ -3132,24 +3133,26 @@ public RegionScannerImpl getScanner(Scan scan, List additionalS } private RegionScannerImpl getScanner(Scan scan, List additionalScanners, - long nonceGroup, long nonce) throws IOException { - startRegionOperation(Operation.SCAN); - try { - // Verify families are all valid - if (!scan.hasFamilies()) { - // Adding all families to scanner - for (byte[] family : this.htableDescriptor.getColumnFamilyNames()) { - scan.addFamily(family); - } - } else { - for (byte[] family : scan.getFamilyMap().keySet()) { - checkFamily(family); + long nonceGroup, long nonce) throws IOException { + return TraceUtil.trace(() -> { + startRegionOperation(Operation.SCAN); + try { + // Verify families are all valid + if (!scan.hasFamilies()) { + // Adding all families to scanner + for (byte[] family : this.htableDescriptor.getColumnFamilyNames()) { + scan.addFamily(family); + } + } else { + for (byte[] family : scan.getFamilyMap().keySet()) { + checkFamily(family); + } } + return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce); + } finally { + closeRegionOperation(Operation.SCAN); } - return instantiateRegionScanner(scan, additionalScanners, nonceGroup, nonce); - } finally { - closeRegionOperation(Operation.SCAN); - } + }, () -> createRegionSpan("Region.getScanner")); } protected RegionScannerImpl instantiateRegionScanner(Scan scan, @@ -3186,15 +3189,17 @@ private void prepareDelete(Delete delete) throws IOException { @Override public void delete(Delete delete) throws IOException { - checkReadOnly(); - checkResources(); - startRegionOperation(Operation.DELETE); - try { - // All edits for the given row (across all column families) must happen atomically. - mutate(delete); - } finally { - closeRegionOperation(Operation.DELETE); - } + TraceUtil.trace(() -> { + checkReadOnly(); + checkResources(); + startRegionOperation(Operation.DELETE); + try { + // All edits for the given row (across all column families) must happen atomically. + return mutate(delete); + } finally { + closeRegionOperation(Operation.DELETE); + } + }, () -> createRegionSpan("Region.delete")); } /** @@ -3264,20 +3269,22 @@ private void updateDeleteLatestVersionTimestamp(Cell cell, Get get, int count, b @Override public void put(Put put) throws IOException { - checkReadOnly(); + TraceUtil.trace(() -> { + checkReadOnly(); - // Do a rough check that we have resources to accept a write. The check is - // 'rough' in that between the resource check and the call to obtain a - // read lock, resources may run out. For now, the thought is that this - // will be extremely rare; we'll deal with it when it happens. - checkResources(); - startRegionOperation(Operation.PUT); - try { - // All edits for the given row (across all column families) must happen atomically. - mutate(put); - } finally { - closeRegionOperation(Operation.PUT); - } + // Do a rough check that we have resources to accept a write. The check is + // 'rough' in that between the resource check and the call to obtain a + // read lock, resources may run out. For now, the thought is that this + // will be extremely rare; we'll deal with it when it happens. + checkResources(); + startRegionOperation(Operation.PUT); + try { + // All edits for the given row (across all column families) must happen atomically. + return mutate(put); + } finally { + closeRegionOperation(Operation.PUT); + } + }, () -> createRegionSpan("Region.put")); } /** @@ -3561,7 +3568,7 @@ public MiniBatchOperationInProgress lockRowsAndBuildMiniBatch( boolean throwException = false; try { // if atomic then get exclusive lock, else shared lock - rowLock = region.getRowLockInternal(mutation.getRow(), !isAtomic(), prevRowLock); + rowLock = region.getRowLock(mutation.getRow(), !isAtomic(), prevRowLock); } catch (TimeoutIOException | InterruptedIOException e) { // NOTE: We will retry when other exceptions, but we should stop if we receive // TimeoutIOException or InterruptedIOException as operation has timed out or @@ -4333,7 +4340,7 @@ private void checkAndMergeCPMutations(final MiniBatchOperationInProgress batchMutate(mutations, atomic, HConstants.NO_NONCE, HConstants.NO_NONCE), + () -> createRegionSpan("Region.batchMutate")); } public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) @@ -4779,8 +4788,8 @@ public boolean checkAndRowMutate(byte[] row, Filter filter, TimeRange timeRange, return checkAndMutate(checkAndMutate).isSuccess(); } - @Override - public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutate) + throws IOException { byte[] row = checkAndMutate.getRow(); Filter filter = null; byte[] family = null; @@ -4829,7 +4838,7 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws } // Lock row - note that doBatchMutate will relock this row if called checkRow(row, "doCheckAndRowMutate"); - RowLock rowLock = getRowLockInternal(get.getRow(), false, null); + RowLock rowLock = getRowLock(get.getRow(), false, null); try { if (this.getCoprocessorHost() != null) { CheckAndMutateResult result = @@ -4839,7 +4848,7 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws } } - // NOTE: We used to wait here until mvcc caught up: mvcc.await(); + // NOTE: We used to wait here until mvcc caught up: mvcc.await(); // Supposition is that now all changes are done under row locks, then when we go to read, // we'll get the latest on this row. List result = get(get, false); @@ -4881,7 +4890,7 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws // And else 'delete' is not needed since it already does a second get, and sets the // timestamp from get (see prepareDeleteTimestamps). } else { - for (Mutation m: rowMutations.getMutations()) { + for (Mutation m : rowMutations.getMutations()) { if (m instanceof Put) { updateCellTimestamps(m.getFamilyCellMap().values(), byteTs); } @@ -4909,8 +4918,13 @@ public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws } } - private void checkMutationType(final Mutation mutation) - throws DoNotRetryIOException { + @Override + public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException { + return TraceUtil.trace(() -> checkAndMutateInternal(checkAndMutate), + () -> createRegionSpan("Region.checkAndMutate")); + } + + private void checkMutationType(final Mutation mutation) throws DoNotRetryIOException { if (!(mutation instanceof Put) && !(mutation instanceof Delete) && !(mutation instanceof Increment) && !(mutation instanceof Append)) { throw new org.apache.hadoop.hbase.DoNotRetryIOException( @@ -6574,11 +6588,17 @@ public RowLock getRowLock(byte[] row) throws IOException { @Override public RowLock getRowLock(byte[] row, boolean readLock) throws IOException { checkRow(row, "row lock"); - return getRowLockInternal(row, readLock, null); + return getRowLock(row, readLock, null); } - protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock prevRowLock) - throws IOException { + Span createRegionSpan(String name) { + return TraceUtil.createSpan(name).setAttribute(TraceUtil.REGION_NAMES_KEY, + Arrays.asList(getRegionInfo().getRegionNameAsString())); + } + + // will be override in tests + protected RowLock getRowLockInternal(byte[] row, boolean readLock, RowLock prevRowLock) + throws IOException { // create an object to use a a key in the row lock map HashedBytes rowKey = new HashedBytes(row); @@ -6586,9 +6606,7 @@ protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock RowLockImpl result = null; boolean success = false; - Span span = TraceUtil.getGlobalTracer().spanBuilder("HRegion.getRowLock").startSpan(); - try (Scope scope = span.makeCurrent()) { - span.addEvent("Getting a " + (readLock ? "readLock" : "writeLock")); + try { // Keep trying until we have a lock or error out. // TODO: do we need to add a time component here? while (result == null) { @@ -6625,7 +6643,6 @@ protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock } if (timeout <= 0 || !result.getLock().tryLock(timeout, TimeUnit.MILLISECONDS)) { - span.addEvent("Failed to get row lock"); String message = "Timed out waiting for lock for row: " + rowKey + " in region " + getRegionInfo().getEncodedName(); if (reachDeadlineFirst) { @@ -6643,7 +6660,6 @@ protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock LOG.debug("Thread interrupted waiting for lock on row: {}, in region {}", rowKey, getRegionInfo().getRegionNameAsString()); } - span.addEvent("Interrupted exception getting row lock"); throw throwOnInterrupt(ie); } catch (Error error) { // The maximum lock count for read lock is 64K (hardcoded), when this maximum count @@ -6652,17 +6668,22 @@ protected RowLock getRowLockInternal(byte[] row, boolean readLock, final RowLock LOG.warn("Error to get row lock for {}, in region {}, cause: {}", Bytes.toStringBinary(row), getRegionInfo().getRegionNameAsString(), error); IOException ioe = new IOException(error); - span.addEvent("Error getting row lock"); throw ioe; } finally { // Clean up the counts just in case this was the thing keeping the context alive. if (!success && rowLockContext != null) { rowLockContext.cleanUp(); } - span.end(); } } + private RowLock getRowLock(byte[] row, boolean readLock, final RowLock prevRowLock) + throws IOException { + return TraceUtil.trace(() -> getRowLockInternal(row, readLock, prevRowLock), + () -> createRegionSpan("Region.getRowLock").setAttribute(TraceUtil.ROW_LOCK_READ_LOCK_KEY, + readLock)); + } + private void releaseRowLocks(List rowLocks) { if (rowLocks != null) { for (RowLock rowLock : rowLocks) { @@ -7524,9 +7545,15 @@ public List get(Get get, boolean withCoprocessor) throws IOException { } private List get(Get get, boolean withCoprocessor, long nonceGroup, long nonce) - throws IOException { + throws IOException { + return TraceUtil.trace(() -> getInternal(get, withCoprocessor, nonceGroup, nonce), + () -> createRegionSpan("Region.get")); + } + + private List getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce) + throws IOException { List results = new ArrayList<>(); - long before = EnvironmentEdgeManager.currentTime(); + long before = EnvironmentEdgeManager.currentTime(); // pre-get CP hook if (withCoprocessor && (coprocessorHost != null)) { @@ -7539,13 +7566,8 @@ private List get(Get get, boolean withCoprocessor, long nonceGroup, long n if (scan.getLoadColumnFamiliesOnDemandValue() == null) { scan.setLoadColumnFamiliesOnDemand(isLoadingCfsOnDemandDefault()); } - RegionScanner scanner = null; - try { - scanner = getScanner(scan, null, nonceGroup, nonce); + try (RegionScanner scanner = getScanner(scan, null, nonceGroup, nonce)) { scanner.next(results); - } finally { - if (scanner != null) - scanner.close(); } // post-get CP hook @@ -7614,7 +7636,7 @@ public MiniBatchOperationInProgress lockRowsAndBuildMiniBatch( RowLock prevRowLock = null; for (byte[] row : rowsToLock) { try { - RowLock rowLock = region.getRowLockInternal(row, false, prevRowLock); // write lock + RowLock rowLock = region.getRowLock(row, false, prevRowLock); // write lock if (rowLock != prevRowLock) { acquiredRowLocks.add(rowLock); prevRowLock = rowLock; @@ -7715,7 +7737,7 @@ public void processRowsWithLocks(RowProcessor processor, long timeout, for (byte[] row : rowsToLock) { // Attempt to lock all involved rows, throw if any lock times out // use a writer lock for mixed reads and writes - RowLock rowLock = getRowLockInternal(row, false, prevRowLock); + RowLock rowLock = getRowLock(row, false, prevRowLock); if (rowLock != prevRowLock) { acquiredRowLocks.add(rowLock); prevRowLock = rowLock; @@ -7882,15 +7904,17 @@ public Result append(Append append) throws IOException { } public Result append(Append append, long nonceGroup, long nonce) throws IOException { - checkReadOnly(); - checkResources(); - startRegionOperation(Operation.APPEND); - try { - // All edits for the given row (across all column families) must happen atomically. - return mutate(append, true, nonceGroup, nonce).getResult(); - } finally { - closeRegionOperation(Operation.APPEND); - } + return TraceUtil.trace(() -> { + checkReadOnly(); + checkResources(); + startRegionOperation(Operation.APPEND); + try { + // All edits for the given row (across all column families) must happen atomically. + return mutate(append, true, nonceGroup, nonce).getResult(); + } finally { + closeRegionOperation(Operation.APPEND); + } + }, () -> createRegionSpan("Region.append")); } @Override @@ -7899,15 +7923,17 @@ public Result increment(Increment increment) throws IOException { } public Result increment(Increment increment, long nonceGroup, long nonce) throws IOException { - checkReadOnly(); - checkResources(); - startRegionOperation(Operation.INCREMENT); - try { - // All edits for the given row (across all column families) must happen atomically. - return mutate(increment, true, nonceGroup, nonce).getResult(); - } finally { - closeRegionOperation(Operation.INCREMENT); - } + return TraceUtil.trace(() -> { + checkReadOnly(); + checkResources(); + startRegionOperation(Operation.INCREMENT); + try { + // All edits for the given row (across all column families) must happen atomically. + return mutate(increment, true, nonceGroup, nonce).getResult(); + } finally { + closeRegionOperation(Operation.INCREMENT); + } + }, () -> createRegionSpan("Region.increment")); } private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index 5d81687cbf45..c24b0d47a9a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -254,42 +255,44 @@ public boolean nextRaw(List outResults) throws IOException { @Override public boolean nextRaw(List outResults, ScannerContext scannerContext) throws IOException { - if (storeHeap == null) { - // scanner is closed - throw new UnknownScannerException("Scanner was closed"); - } - boolean moreValues = false; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - moreValues = nextInternal(outResults, scannerContext); - } else { - List tmpList = new ArrayList<>(); - moreValues = nextInternal(tmpList, scannerContext); - outResults.addAll(tmpList); - } + return TraceUtil.trace(() -> { + if (storeHeap == null) { + // scanner is closed + throw new UnknownScannerException("Scanner was closed"); + } + boolean moreValues = false; + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + moreValues = nextInternal(outResults, scannerContext); + } else { + List tmpList = new ArrayList<>(); + moreValues = nextInternal(tmpList, scannerContext); + outResults.addAll(tmpList); + } - if (!outResults.isEmpty()) { - region.addReadRequestsCount(1); - if (region.getMetrics() != null) { - region.getMetrics().updateReadRequestCount(); + if (!outResults.isEmpty()) { + region.addReadRequestsCount(1); + if (region.getMetrics() != null) { + region.getMetrics().updateReadRequestCount(); + } + } + if (rsServices != null && rsServices.getMetrics() != null) { + rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable()); } - } - if (rsServices != null && rsServices.getMetrics() != null) { - rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable()); - } - // If the size limit was reached it means a partial Result is being returned. Returning a - // partial Result means that we should not reset the filters; filters should only be reset in - // between rows - if (!scannerContext.mayHaveMoreCellsInRow()) { - resetFilters(); - } + // If the size limit was reached it means a partial Result is being returned. Returning a + // partial Result means that we should not reset the filters; filters should only be reset in + // between rows + if (!scannerContext.mayHaveMoreCellsInRow()) { + resetFilters(); + } - if (isFilterDoneInternal()) { - moreValues = false; - } - return moreValues; + if (isFilterDoneInternal()) { + moreValues = false; + } + return moreValues; + }, () -> region.createRegionSpan("RegionScanner.next")); } /** @@ -728,8 +731,9 @@ protected boolean shouldStop(Cell currentRowCell) { return c > 0 || (c == 0 && !includeStopRow); } - @Override - public synchronized void close() { + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", + justification = "this method is only called inside close which is synchronized") + private void closeInternal() { if (storeHeap != null) { storeHeap.close(); storeHeap = null; @@ -743,24 +747,31 @@ public synchronized void close() { this.filterClosed = true; } + @Override + public synchronized void close() { + TraceUtil.trace(this::closeInternal, () -> region.createRegionSpan("RegionScanner.close")); + } + @Override public synchronized boolean reseek(byte[] row) throws IOException { - if (row == null) { - throw new IllegalArgumentException("Row cannot be null."); - } - boolean result = false; - region.startRegionOperation(); - Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); - try { - // use request seek to make use of the lazy seek option. See HBASE-5520 - result = this.storeHeap.requestSeek(kv, true, true); - if (this.joinedHeap != null) { - result = this.joinedHeap.requestSeek(kv, true, true) || result; + return TraceUtil.trace(() -> { + if (row == null) { + throw new IllegalArgumentException("Row cannot be null."); } - } finally { - region.closeRegionOperation(); - } - return result; + boolean result = false; + region.startRegionOperation(); + Cell kv = PrivateCellUtil.createFirstOnRow(row, 0, (short) row.length); + try { + // use request seek to make use of the lazy seek option. See HBASE-5520 + result = this.storeHeap.requestSeek(kv, true, true); + if (this.joinedHeap != null) { + result = this.joinedHeap.requestSeek(kv, true, true) || result; + } + } finally { + region.closeRegionOperation(); + } + return result; + }, () -> region.createRegionSpan("RegionScanner.reseek")); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index 6918eda2ff54..95ea37e9aea0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -725,7 +725,7 @@ public MockHRegion(Path tableDir, WAL log, FileSystem fs, Configuration conf, } @Override - public RowLock getRowLockInternal(final byte[] row, boolean readLock, + protected RowLock getRowLockInternal(final byte[] row, boolean readLock, final RowLock prevRowlock) throws IOException { if (testStep == TestStep.CHECKANDPUT_STARTED) { latch.countDown(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionTracing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionTracing.java new file mode 100644 index 000000000000..3a772a28059a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionTracing.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertTrue; + +import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestHRegionTracing { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestHRegionTracing.class); + + private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static byte[] FAMILY = Bytes.toBytes("family"); + + private static byte[] QUALIFIER = Bytes.toBytes("qual"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] VALUE = Bytes.toBytes("value"); + + @Rule + public final OpenTelemetryRule traceRule = OpenTelemetryRule.create(); + + @Rule + public final TableNameTestRule tableNameRule = new TableNameTestRule(); + + private static WAL WAL; + + private HRegion region; + + @BeforeClass + public static void setUpBeforeClass() throws IOException { + WAL = HBaseTestingUtility.createWal(UTIL.getConfiguration(), UTIL.getDataTestDir(), null); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + Closeables.close(WAL, true); + UTIL.cleanupTestDir(); + } + + @Before + public void setUp() throws IOException { + TableName tableName = tableNameRule.getTableName(); + TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); + RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, + MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); + region = HRegion.createHRegion(info, UTIL.getDataTestDir(), UTIL.getConfiguration(), desc, WAL); + region = UTIL.createLocalHRegion(info, desc); + } + + @After + public void tearDown() throws IOException { + if (region != null) { + region.close(); + } + } + + private void assertSpan(String spanName) { + assertTrue(traceRule.getSpans().stream().anyMatch(span -> { + if (!span.getName().equals(spanName)) { + return false; + } + List regionNames = span.getAttributes().get(TraceUtil.REGION_NAMES_KEY); + return regionNames != null && regionNames.size() == 1 && + regionNames.get(0).equals(region.getRegionInfo().getRegionNameAsString()); + })); + } + + @Test + public void testGet() throws IOException { + region.get(new Get(ROW)); + assertSpan("Region.get"); + } + + @Test + public void testPut() throws IOException { + region.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); + assertSpan("Region.put"); + assertSpan("Region.getRowLock"); + } + + @Test + public void testDelete() throws IOException { + region.delete(new Delete(ROW).addColumn(FAMILY, QUALIFIER)); + assertSpan("Region.delete"); + assertSpan("Region.getRowLock"); + } + + @Test + public void testAppend() throws IOException { + region.append(new Append(ROW).addColumn(FAMILY, QUALIFIER, VALUE)); + assertSpan("Region.append"); + assertSpan("Region.getRowLock"); + } + + @Test + public void testIncrement() throws IOException { + region.increment(new Increment(ROW).addColumn(FAMILY, QUALIFIER, 1)); + assertSpan("Region.increment"); + assertSpan("Region.getRowLock"); + } + + @Test + public void testBatchMutate() throws IOException { + region.batchMutate(new Mutation[] { new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE) }); + assertSpan("Region.batchMutate"); + assertSpan("Region.getRowLock"); + } + + @Test + public void testCheckAndMutate() throws IOException { + region.checkAndMutate(CheckAndMutate.newBuilder(ROW).ifNotExists(FAMILY, QUALIFIER) + .build(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE))); + assertSpan("Region.checkAndMutate"); + assertSpan("Region.getRowLock"); + } + + @Test + public void testScanner() throws IOException { + try (RegionScanner scanner = region.getScanner(new Scan())) { + scanner.reseek(ROW); + scanner.next(new ArrayList<>()); + } + assertSpan("Region.getScanner"); + assertSpan("RegionScanner.reseek"); + assertSpan("RegionScanner.next"); + assertSpan("RegionScanner.close"); + } +} From 32ae28f9e9bdc5301c8caa0aa1daee9db025027d Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Wed, 13 Jan 2021 22:48:12 +0800 Subject: [PATCH 2/2] remove unused import --- .../main/java/org/apache/hadoop/hbase/regionserver/HRegion.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 88ab3a173a62..8d68c94e6881 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -23,8 +23,6 @@ import edu.umd.cs.findbugs.annotations.Nullable; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.context.Scope; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException;