diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index 48f004c0a29c..d7984628319a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,7 +27,9 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote; import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; - +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -35,11 +37,11 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.yetus.audience.InterfaceAudience; - import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface; @@ -85,6 +87,8 @@ class AsyncClientScanner { private final ScanResultCache resultCache; + private final Span span; + public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName, AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { @@ -112,6 +116,21 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN } else { this.scanMetrics = null; } + + /* + * Assumes that the `start()` method is called immediately after construction. If this is no + * longer the case, for tracing correctness, we should move the start of the span into the + * `start()` method. The cost of doing so would be making access to the `span` safe for + * concurrent threads. + */ + span = new TableOperationSpanBuilder(conn) + .setTableName(tableName) + .setOperation(scan) + .build(); + if (consumer instanceof AsyncTableResultScanner) { + AsyncTableResultScanner scanner = (AsyncTableResultScanner) consumer; + scanner.setSpan(span); + } } private static final class OpenScannerResponse { @@ -140,26 +159,35 @@ public OpenScannerResponse(HRegionLocation loc, boolean isRegionServerRemote, In private CompletableFuture callOpenScanner(HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub) { - boolean isRegionServerRemote = isRemote(loc.getHostname()); - incRPCCallsMetrics(scanMetrics, isRegionServerRemote); - if (openScannerTries.getAndIncrement() > 1) { - incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + try (Scope ignored = span.makeCurrent()) { + boolean isRegionServerRemote = isRemote(loc.getHostname()); + incRPCCallsMetrics(scanMetrics, isRegionServerRemote); + if (openScannerTries.getAndIncrement() > 1) { + incRPCRetriesMetrics(scanMetrics, isRegionServerRemote); + } + CompletableFuture future = new CompletableFuture<>(); + try { + ScanRequest request = RequestConverter.buildScanRequest( + loc.getRegion().getRegionName(), scan, scan.getCaching(), false); + stub.scan(controller, request, resp -> { + try (Scope ignored1 = span.makeCurrent()) { + if (controller.failed()) { + final IOException e = controller.getFailed(); + future.completeExceptionally(e); + TraceUtil.setError(span, e); + span.end(); + return; + } + future.complete(new OpenScannerResponse( + loc, isRegionServerRemote, stub, controller, resp)); + } + }); + } catch (IOException e) { + // span is closed by listener attached to the Future in `openScanner()` + future.completeExceptionally(e); + } + return future; } - CompletableFuture future = new CompletableFuture<>(); - try { - ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan, - scan.getCaching(), false); - stub.scan(controller, request, resp -> { - if (controller.failed()) { - future.completeExceptionally(controller.getFailed()); - return; - } - future.complete(new OpenScannerResponse(loc, isRegionServerRemote, stub, controller, resp)); - }); - } catch (IOException e) { - future.completeExceptionally(e); - } - return future; } private void startScan(OpenScannerResponse resp) { @@ -173,26 +201,40 @@ private void startScan(OpenScannerResponse resp) { .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp), (hasMore, error) -> { - if (error != null) { - consumer.onError(error); - return; - } - if (hasMore) { - openScanner(); - } else { - consumer.onComplete(); + try (Scope ignored = span.makeCurrent()) { + if (error != null) { + try { + consumer.onError(error); + return; + } finally { + TraceUtil.setError(span, error); + span.end(); + } + } + if (hasMore) { + openScanner(); + } else { + try { + consumer.onComplete(); + } finally { + span.setStatus(StatusCode.OK); + span.end(); + } + } } }); } private CompletableFuture openScanner(int replicaId) { - return conn.callerFactory. single().table(tableName) - .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) - .priority(scan.getPriority()) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) - .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) - .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); + try (Scope ignored = span.makeCurrent()) { + return conn.callerFactory. single().table(tableName) + .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan)) + .priority(scan.getPriority()) + .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) + .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) + .pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) + .startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call(); + } } private long getPrimaryTimeoutNs() { @@ -206,15 +248,24 @@ private void openScanner() { addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(), getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer, conn.getConnectionMetrics()), (resp, error) -> { - if (error != null) { - consumer.onError(error); - return; + try (Scope ignored = span.makeCurrent()) { + if (error != null) { + try { + consumer.onError(error); + return; + } finally { + TraceUtil.setError(span, error); + span.end(); + } + } + startScan(resp); } - startScan(resp); }); } public void start() { - openScanner(); + try (Scope ignored = span.makeCurrent()) { + openScanner(); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 7f19180a0ab2..48e038ecd2e7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -27,7 +27,8 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.client.ConnectionUtils.updateResultsMetrics; import static org.apache.hadoop.hbase.client.ConnectionUtils.updateServerSideMetrics; - +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -50,11 +51,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hbase.thirdparty.io.netty.util.Timer; - import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; @@ -573,7 +572,12 @@ private void call() { resetController(controller, callTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, scan.isScanMetricsEnabled(), false, scan.getLimit()); - stub.scan(controller, req, resp -> onComplete(controller, resp)); + final Context context = Context.current(); + stub.scan(controller, req, resp -> { + try (Scope ignored = context.makeCurrent()) { + onComplete(controller, resp); + } + }); } private void next() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index f6c7d82b0201..0bf3179673db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -231,22 +233,29 @@ public ResultScanner getScanner(Scan scan) { } private void scan0(Scan scan, ScanResultConsumer consumer) { - try (ResultScanner scanner = getScanner(scan)) { - consumer.onScanMetricsCreated(scanner.getScanMetrics()); - for (Result result; (result = scanner.next()) != null;) { - if (!consumer.onNext(result)) { - break; + Span span = null; + try (AsyncTableResultScanner scanner = rawTable.getScanner(scan)) { + span = scanner.getSpan(); + try (Scope ignored = span.makeCurrent()) { + consumer.onScanMetricsCreated(scanner.getScanMetrics()); + for (Result result; (result = scanner.next()) != null; ) { + if (!consumer.onNext(result)) { + break; + } } + consumer.onComplete(); } - consumer.onComplete(); } catch (IOException e) { - consumer.onError(e); + try (Scope ignored = span.makeCurrent()) { + consumer.onError(e); + } } } @Override public void scan(Scan scan, ScanResultConsumer consumer) { - pool.execute(() -> scan0(scan, consumer)); + final Context context = Context.current(); + pool.execute(context.wrap(() -> scan0(scan, consumer))); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index e9b15f999a93..6462cd093f85 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize; - +import io.opentelemetry.api.trace.Span; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayDeque; @@ -58,6 +58,9 @@ class AsyncTableResultScanner implements ResultScanner, AdvancedScanResultConsum private ScanResumer resumer; + // Used to pass the span instance to the `AsyncTableImpl` from its underlying `rawAsyncTable`. + private Span span = null; + public AsyncTableResultScanner(TableName tableName, Scan scan, long maxCacheSize) { this.tableName = tableName; this.maxCacheSize = maxCacheSize; @@ -79,6 +82,14 @@ private void stopPrefetch(ScanController controller) { resumer = controller.suspend(); } + Span getSpan() { + return span; + } + + void setSpan(final Span span) { + this.span = span; + } + @Override public synchronized void onNext(Result[] results, ScanController controller) { assert results.length > 0; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 9c283ca021a7..c3d16e78ebf4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -640,30 +640,26 @@ public AsyncTableResultScanner getScanner(Scan scan) { @Override public CompletableFuture> scanAll(Scan scan) { - final Supplier supplier = newTableOperationSpanBuilder() - .setOperation(scan); - return tracedFuture(() -> { - CompletableFuture> future = new CompletableFuture<>(); - List scanResults = new ArrayList<>(); - scan(scan, new AdvancedScanResultConsumer() { + CompletableFuture> future = new CompletableFuture<>(); + List scanResults = new ArrayList<>(); + scan(scan, new AdvancedScanResultConsumer() { - @Override - public void onNext(Result[] results, ScanController controller) { - scanResults.addAll(Arrays.asList(results)); - } + @Override + public void onNext(Result[] results, ScanController controller) { + scanResults.addAll(Arrays.asList(results)); + } - @Override - public void onError(Throwable error) { - future.completeExceptionally(error); - } + @Override + public void onError(Throwable error) { + future.completeExceptionally(error); + } - @Override - public void onComplete() { - future.complete(scanResults); - } - }); - return future; - }, supplier); + @Override + public void onComplete() { + future.complete(scanResults); + } + }); + return future; } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index ae3bc3a00319..69cd77668dc7 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -28,6 +28,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.fail; @@ -44,8 +46,10 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -106,7 +110,7 @@ public class TestAsyncTableTracing { private AsyncConnectionImpl conn; - private AsyncTable table; + private AsyncTable table; @Rule public OpenTelemetryRule traceRule = OpenTelemetryRule.create(); @@ -452,6 +456,53 @@ public void testScanAll() { assertTrace("SCAN"); } + @Test + public void testScan() throws Throwable { + final CountDownLatch doneSignal = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + final AtomicReference throwable = new AtomicReference<>(); + final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); + table.scan(scan, new ScanResultConsumer() { + @Override public boolean onNext(Result result) { + if (result.getRow() != null) { + count.incrementAndGet(); + } + return true; + } + + @Override public void onError(Throwable error) { + throwable.set(error); + doneSignal.countDown(); + } + + @Override public void onComplete() { + doneSignal.countDown(); + } + }); + doneSignal.await(); + if (throwable.get() != null) { + throw throwable.get(); + } + assertThat("user code did not run. check test setup.", count.get(), greaterThan(0)); + assertTrace("SCAN"); + } + + @Test + public void testGetScanner() { + final Scan scan = new Scan().setCaching(1).setMaxResultSize(1).setLimit(1); + try (ResultScanner scanner = table.getScanner(scan)) { + int count = 0; + for (Result result : scanner) { + if (result.getRow() != null) { + count++; + } + } + // do something with it. + assertThat(count, greaterThanOrEqualTo(0)); + } + assertTrace("SCAN"); + } + @Test public void testExistsList() { CompletableFuture diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java index 01aa61805a21..026deb0afe45 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client.trace.hamcrest; +import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import io.opentelemetry.api.common.Attributes; @@ -25,7 +26,9 @@ import io.opentelemetry.sdk.trace.data.EventData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.StatusData; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.time.Duration; +import java.util.Objects; import org.hamcrest.Description; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; @@ -78,6 +81,24 @@ public static Matcher hasEvents(Matcher> m }; } + public static Matcher hasExceptionWithType(Matcher matcher) { + return hasException(containsEntry(is(SemanticAttributes.EXCEPTION_TYPE), matcher)); + } + + public static Matcher hasException(Matcher matcher) { + return new FeatureMatcher(matcher, + "SpanData having Exception with Attributes that", "exception attributes") { + @Override protected Attributes featureValueOf(SpanData actual) { + return actual.getEvents() + .stream() + .filter(e -> Objects.equals(SemanticAttributes.EXCEPTION_EVENT_NAME, e.getName())) + .map(EventData::getAttributes) + .findFirst() + .orElse(null); + } + }; + } + public static Matcher hasKind(SpanKind kind) { return new FeatureMatcher( equalTo(kind), "SpanData with kind that", "SpanKind") { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java index afd74bb9314f..30c54244dfc9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/StartTestingClusterOption.java @@ -264,6 +264,11 @@ public Builder numZkServers(int numZkServers) { return this; } + public Builder numWorkers(int numWorkers) { + return numDataNodes(numWorkers) + .numRegionServers(numWorkers); + } + public Builder createRootDir(boolean createRootDir) { this.createRootDir = createRootDir; return this; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java index 14456eca1b13..ba160f1d7c3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableScan.java @@ -17,29 +17,95 @@ */ package org.apache.hadoop.hbase.client; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasProperty; +import static org.hamcrest.Matchers.isA; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.fail; +import io.opentelemetry.sdk.trace.data.SpanData; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ConnectionRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.MiniClusterRule; +import org.apache.hadoop.hbase.StartTestingClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; +import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; +import org.apache.hadoop.hbase.trace.OpenTelemetryTestRule; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Pair; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.hamcrest.Matcher; +import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.junit.rules.RuleChain; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; public abstract class AbstractTestAsyncTableScan { - protected static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + protected static final OpenTelemetryClassRule otelClassRule = OpenTelemetryClassRule.create(); + protected static final MiniClusterRule miniClusterRule = MiniClusterRule.newBuilder() + .setMiniClusterOption(StartTestingClusterOption.builder() + .numWorkers(3) + .build()) + .build(); + + protected static final ConnectionRule connectionRule = + ConnectionRule.createAsyncConnectionRule(miniClusterRule::createAsyncConnection); + + private static final class Setup extends ExternalResource { + @Override + protected void before() throws Throwable { + final HBaseTestingUtil testingUtil = miniClusterRule.getTestingUtility(); + final AsyncConnection conn = connectionRule.getAsyncConnection(); + + byte[][] splitKeys = new byte[8][]; + for (int i = 111; i < 999; i += 111) { + splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); + } + testingUtil.createTable(TABLE_NAME, FAMILY, splitKeys); + testingUtil.waitTableAvailable(TABLE_NAME); + conn.getTable(TABLE_NAME) + .putAll(IntStream.range(0, COUNT) + .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, CQ1, Bytes.toBytes(i)) + .addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) + .collect(Collectors.toList())) + .get(); + } + } + + @ClassRule + public static final TestRule classRule = RuleChain.outerRule(otelClassRule) + .around(miniClusterRule) + .around(connectionRule) + .around(new Setup()); + + @Rule + public final OpenTelemetryTestRule otelTestRule = new OpenTelemetryTestRule(otelClassRule); + + @Rule + public final TestName testName = new TestName(); protected static TableName TABLE_NAME = TableName.valueOf("async"); @@ -51,53 +117,29 @@ public abstract class AbstractTestAsyncTableScan { protected static int COUNT = 1000; - protected static AsyncConnection ASYNC_CONN; - - @BeforeClass - public static void setUp() throws Exception { - TEST_UTIL.startMiniCluster(3); - byte[][] splitKeys = new byte[8][]; - for (int i = 111; i < 999; i += 111) { - splitKeys[i / 111 - 1] = Bytes.toBytes(String.format("%03d", i)); - } - TEST_UTIL.createTable(TABLE_NAME, FAMILY, splitKeys); - TEST_UTIL.waitTableAvailable(TABLE_NAME); - ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); - ASYNC_CONN.getTable(TABLE_NAME).putAll(IntStream.range(0, COUNT) - .mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, CQ1, Bytes.toBytes(i)).addColumn(FAMILY, CQ2, Bytes.toBytes(i * i))) - .collect(Collectors.toList())).get(); - } - - @AfterClass - public static void tearDown() throws Exception { - ASYNC_CONN.close(); - TEST_UTIL.shutdownMiniCluster(); - } - - protected static Scan createNormalScan() { + private static Scan createNormalScan() { return new Scan(); } - protected static Scan createBatchScan() { + private static Scan createBatchScan() { return new Scan().setBatch(1); } // set a small result size for testing flow control - protected static Scan createSmallResultSizeScan() { + private static Scan createSmallResultSizeScan() { return new Scan().setMaxResultSize(1); } - protected static Scan createBatchSmallResultSizeScan() { + private static Scan createBatchSmallResultSizeScan() { return new Scan().setBatch(1).setMaxResultSize(1); } - protected static AsyncTable getRawTable() { - return ASYNC_CONN.getTable(TABLE_NAME); + private static AsyncTable getRawTable() { + return connectionRule.getAsyncConnection().getTable(TABLE_NAME); } - protected static AsyncTable getTable() { - return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + private static AsyncTable getTable() { + return connectionRule.getAsyncConnection().getTable(TABLE_NAME, ForkJoinPool.commonPool()); } private static List>> getScanCreator() { @@ -131,8 +173,18 @@ protected static List getTableAndScanCreatorParams() { protected abstract List doScan(Scan scan, int closeAfter) throws Exception; + /** + * Used by implementation classes to assert the correctness of spans produced under test. + */ + protected abstract void assertTraceContinuity(); + + /** + * Used by implementation classes to assert the correctness of spans having errors. + */ + protected abstract void assertTraceError(final Matcher exceptionTypeNameMatcher); + protected final List convertFromBatchResult(List results) { - assertTrue(results.size() % 2 == 0); + assertEquals(0, results.size() % 2); return IntStream.range(0, results.size() / 2).mapToObj(i -> { try { return Result @@ -143,16 +195,25 @@ protected final List convertFromBatchResult(List results) { }).collect(Collectors.toList()); } + protected static void waitForSpan(final Matcher parentSpanMatcher) { + final Configuration conf = miniClusterRule.getTestingUtility().getConfiguration(); + Waiter.waitFor(conf, TimeUnit.SECONDS.toMillis(5), new MatcherPredicate<>( + "Span for test failed to complete.", otelClassRule::getSpans, hasItem(parentSpanMatcher))); + } + @Test public void testScanAll() throws Exception { List results = doScan(createScan(), -1); // make sure all scanners are closed at RS side - TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream().map(t -> t.getRegionServer()) - .forEach( - rs -> assertEquals( - "The scanner count of " + rs.getServerName() + " is " + - rs.getRSRpcServices().getScannersCount(), - 0, rs.getRSRpcServices().getScannersCount())); + miniClusterRule.getTestingUtility() + .getHBaseCluster() + .getRegionServerThreads() + .stream() + .map(JVMClusterUtil.RegionServerThread::getRegionServer) + .forEach(rs -> assertEquals( + "The scanner count of " + rs.getServerName() + " is " + + rs.getRSRpcServices().getScannersCount(), + 0, rs.getRSRpcServices().getScannersCount())); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> { Result result = results.get(i); @@ -169,37 +230,55 @@ private void assertResultEquals(Result result, int i) { @Test public void testReversedScanAll() throws Exception { - List results = doScan(createScan().setReversed(true), -1); + List results = TraceUtil.trace( + () -> doScan(createScan().setReversed(true), -1), testName.getMethodName()); assertEquals(COUNT, results.size()); IntStream.range(0, COUNT).forEach(i -> assertResultEquals(results.get(i), COUNT - i - 1)); + assertTraceContinuity(); } @Test public void testScanNoStopKey() throws Exception { int start = 345; - List results = - doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1); + List results = TraceUtil.trace(() -> + doScan(createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))), -1), + testName.getMethodName()); assertEquals(COUNT - start, results.size()); IntStream.range(0, COUNT - start).forEach(i -> assertResultEquals(results.get(i), start + i)); + assertTraceContinuity(); } @Test public void testReverseScanNoStopKey() throws Exception { int start = 765; - List results = doScan( - createScan().withStartRow(Bytes.toBytes(String.format("%03d", start))).setReversed(true), -1); + final Scan scan = createScan() + .withStartRow(Bytes.toBytes(String.format("%03d", start))) + .setReversed(true); + List results = TraceUtil.trace(() -> doScan(scan, -1), testName.getMethodName()); assertEquals(start + 1, results.size()); IntStream.range(0, start + 1).forEach(i -> assertResultEquals(results.get(i), start - i)); + assertTraceContinuity(); } @Test - public void testScanWrongColumnFamily() throws Exception { - try { - doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1); - } catch (Exception e) { - assertTrue(e instanceof NoSuchColumnFamilyException || - e.getCause() instanceof NoSuchColumnFamilyException); + public void testScanWrongColumnFamily() { + final Exception e = assertThrows(Exception.class, () -> TraceUtil.trace( + () -> doScan(createScan().addFamily(Bytes.toBytes("WrongColumnFamily")), -1), + testName.getMethodName())); + // hamcrest generic enforcement for `anyOf` is a pain; skip it + // but -- don't we always unwrap ExecutionExceptions -- bug? + if (e instanceof NoSuchColumnFamilyException) { + final NoSuchColumnFamilyException ex = (NoSuchColumnFamilyException) e; + assertThat(ex, isA(NoSuchColumnFamilyException.class)); + } else if (e instanceof ExecutionException) { + final ExecutionException ex = (ExecutionException) e; + assertThat(ex, allOf( + isA(ExecutionException.class), + hasProperty("cause", isA(NoSuchColumnFamilyException.class)))); + } else { + fail("Found unexpected Exception " + e); } + assertTraceError(endsWith(NoSuchColumnFamilyException.class.getName())); } private void testScan(int start, boolean startInclusive, int stop, boolean stopInclusive, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java new file mode 100644 index 000000000000..28fbc5ab28f4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/LimitedScanResultConsumer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +/** + * Advise the scanning infrastructure to collect up to {@code limit} results. + */ +class LimitedScanResultConsumer extends SimpleScanResultConsumerImpl { + + private final int limit; + + public LimitedScanResultConsumer(int limit) { + this.limit = limit; + } + + @Override + public synchronized boolean onNext(Result result) { + return super.onNext(result) && results.size() < limit; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java index fce6773f7a6c..b019fe75420b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,59 +17,15 @@ */ package org.apache.hadoop.hbase.client; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; - -import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -final class SimpleScanResultConsumer implements ScanResultConsumer { - - private ScanMetrics scanMetrics; - - private final List results = new ArrayList<>(); - - private Throwable error; - - private boolean finished = false; - - @Override - public void onScanMetricsCreated(ScanMetrics scanMetrics) { - this.scanMetrics = scanMetrics; - } - - @Override - public synchronized boolean onNext(Result result) { - results.add(result); - return true; - } - - @Override - public synchronized void onError(Throwable error) { - this.error = error; - finished = true; - notifyAll(); - } - - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); - } +/** + * A simplistic {@link ScanResultConsumer} for use in tests. + */ +public interface SimpleScanResultConsumer extends ScanResultConsumer { - public synchronized List getAll() throws Exception { - while (!finished) { - wait(); - } - if (error != null) { - Throwables.propagateIfPossible(error, Exception.class); - throw new Exception(error); - } - return results; - } + List getAll() throws Exception; - public ScanMetrics getScanMetrics() { - return scanMetrics; - } + ScanMetrics getScanMetrics(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java new file mode 100644 index 000000000000..98941fece196 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/SimpleScanResultConsumerImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hbase.thirdparty.com.google.common.base.Throwables; + +class SimpleScanResultConsumerImpl implements SimpleScanResultConsumer { + + private ScanMetrics scanMetrics; + + protected final List results = new ArrayList<>(); + + private Throwable error; + + private boolean finished = false; + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + this.scanMetrics = scanMetrics; + } + + @Override + public synchronized boolean onNext(Result result) { + results.add(result); + return true; + } + + @Override + public synchronized void onError(Throwable error) { + this.error = error; + finished = true; + notifyAll(); + } + + @Override + public synchronized void onComplete() { + finished = true; + notifyAll(); + } + + @Override + public synchronized List getAll() throws Exception { + while (!finished) { + wait(); + } + if (error != null) { + Throwables.propagateIfPossible(error, Exception.class); + throw new Exception(error); + } + return results; + } + + @Override + public ScanMetrics getScanMetrics() { + return scanMetrics; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java index c1797f3833c3..7ea7388b78cb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScan.java @@ -17,24 +17,40 @@ */ package org.apache.hadoop.hbase.client; -import java.util.ArrayList; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; +import java.util.Objects; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScan extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScan.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -58,8 +74,8 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { - AsyncTable table = - ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + AsyncTable table = connectionRule.getAsyncConnection() + .getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results; if (closeAfter > 0) { // these tests batch settings with the sample data result in each result being @@ -68,11 +84,13 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { if (scan.getBatch() > 0) { closeAfter = closeAfter * 2; } - LimitedScanResultConsumer consumer = new LimitedScanResultConsumer(closeAfter); + TracedScanResultConsumer consumer = + new TracedScanResultConsumer(new LimitedScanResultConsumer(closeAfter)); table.scan(scan, consumer); results = consumer.getAll(); } else { - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + TracedScanResultConsumer consumer = + new TracedScanResultConsumer(new SimpleScanResultConsumerImpl()); table.scan(scan, consumer); results = consumer.getAll(); } @@ -82,49 +100,115 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { return results; } - private static class LimitedScanResultConsumer implements ScanResultConsumer { - - private final int limit; - - public LimitedScanResultConsumer(int limit) { - this.limit = limit; - } - - private final List results = new ArrayList<>(); - - private Throwable error; - - private boolean finished = false; - - @Override - public synchronized boolean onNext(Result result) { - results.add(result); - return results.size() < limit; + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); } - @Override - public synchronized void onError(Throwable error) { - this.error = error; - finished = true; - notifyAll(); - } + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher onScanMetricsCreatedMatcher = + hasName("TracedScanResultConsumer#onScanMetricsCreated"); + assertThat(spans, hasItem(onScanMetricsCreatedMatcher)); + spans.stream() + .filter(onScanMetricsCreatedMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onScanMetricsCreatedMatcher, + hasParentSpanId(scanOperationSpanId), + hasEnded()))); + + final Matcher onNextMatcher = hasName("TracedScanResultConsumer#onNext"); + assertThat(spans, hasItem(onNextMatcher)); + spans.stream() + .filter(onNextMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onNextMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + + final Matcher onCompleteMatcher = hasName("TracedScanResultConsumer#onComplete"); + assertThat(spans, hasItem(onCompleteMatcher)); + spans.stream() + .filter(onCompleteMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onCompleteMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } - @Override - public synchronized void onComplete() { - finished = true; - notifyAll(); + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); } - public synchronized List getAll() throws Exception { - while (!finished) { - wait(); - } - if (error != null) { - Throwables.propagateIfPossible(error, Exception.class); - throw new Exception(error); - } - return results; - } + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher onErrorMatcher = hasName("TracedScanResultConsumer#onError"); + assertThat(spans, hasItem(onErrorMatcher)); + spans.stream() + .filter(onErrorMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onErrorMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java index 96c2d40138ca..33460bf4dbaf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanAll.java @@ -17,21 +17,39 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScanAll extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanAll.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -72,4 +90,66 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { } return results; } + + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + } + + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java index 6420806d7ee8..9c7f024c99c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java @@ -123,7 +123,7 @@ private static Pair, ScanMetrics> doScanWithRawAsyncTable(Scan scan private static Pair, ScanMetrics> doScanWithAsyncTableScan(Scan scan) throws Exception { - SimpleScanResultConsumer consumer = new SimpleScanResultConsumer(); + SimpleScanResultConsumerImpl consumer = new SimpleScanResultConsumerImpl(); CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()).scan(scan, consumer); return Pair.newPair(consumer.getAll(), consumer.getScanMetrics()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java index 2e990f763da0..9f3d4de54452 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanner.java @@ -17,23 +17,41 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.ForkJoinPool; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestAsyncTableScanner extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestAsyncTableScanner.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -63,7 +81,8 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { - AsyncTable table = ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool()); + AsyncTable table = connectionRule.getAsyncConnection() + .getTable(TABLE_NAME, ForkJoinPool.commonPool()); List results = new ArrayList<>(); // these tests batch settings with the sample data result in each result being // split in two. so we must allow twice the expected results in order to reach @@ -84,4 +103,65 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { } return results; } + + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + assertThat(spans, hasItem(allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } + + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 26c201e19865..bfa24661954a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -17,22 +17,41 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasExceptionWithType; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasParentSpanId; +import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.startsWith; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.sdk.trace.data.SpanData; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.hamcrest.Matcher; import org.junit.ClassRule; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @RunWith(Parameterized.class) @Category({ LargeTests.class, ClientTests.class }) public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { + private static final Logger logger = LoggerFactory.getLogger(TestRawAsyncTableScan.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -56,8 +75,8 @@ protected Scan createScan() { @Override protected List doScan(Scan scan, int closeAfter) throws Exception { - BufferingScanResultConsumer scanConsumer = new BufferingScanResultConsumer(); - ASYNC_CONN.getTable(TABLE_NAME).scan(scan, scanConsumer); + TracedAdvancedScanResultConsumer scanConsumer = new TracedAdvancedScanResultConsumer(); + connectionRule.getAsyncConnection().getTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); // these tests batch settings with the sample data result in each result being // split in two. so we must allow twice the expected results in order to reach @@ -76,4 +95,113 @@ protected List doScan(Scan scan, int closeAfter) throws Exception { } return results; } + + @Override + protected void assertTraceContinuity() { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf( + hasName(parentSpanName), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + // RawAsyncTableImpl never invokes the callback to `onScanMetricsCreated` -- bug? + final Matcher onScanMetricsCreatedMatcher = + hasName("TracedAdvancedScanResultConsumer#onScanMetricsCreated"); + assertThat(spans, not(hasItem(onScanMetricsCreatedMatcher))); + + final Matcher onNextMatcher = hasName("TracedAdvancedScanResultConsumer#onNext"); + assertThat(spans, hasItem(onNextMatcher)); + spans.stream() + .filter(onNextMatcher::matches) + .forEach(span -> assertThat(span, hasParentSpanId(scanOperationSpanId))); + assertThat(spans, hasItem(allOf( + onNextMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + + final Matcher onCompleteMatcher = + hasName("TracedAdvancedScanResultConsumer#onComplete"); + assertThat(spans, hasItem(onCompleteMatcher)); + spans.stream() + .filter(onCompleteMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onCompleteMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } + + @Override + protected void assertTraceError(Matcher exceptionTypeNameMatcher) { + final String parentSpanName = testName.getMethodName(); + final Matcher parentSpanMatcher = allOf(hasName(parentSpanName), hasEnded()); + waitForSpan(parentSpanMatcher); + + final List spans = otelClassRule.getSpans() + .stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + if (logger.isDebugEnabled()) { + StringTraceRenderer stringTraceRenderer = new StringTraceRenderer(spans); + stringTraceRenderer.render(logger::debug); + } + + final String parentSpanId = spans.stream() + .filter(parentSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher scanOperationSpanMatcher = allOf( + hasName(startsWith("SCAN " + TABLE_NAME.getNameWithNamespaceInclAsString())), + hasParentSpanId(parentSpanId), + hasStatusWithCode(StatusCode.ERROR), + hasExceptionWithType(exceptionTypeNameMatcher), + hasEnded()); + assertThat(spans, hasItem(scanOperationSpanMatcher)); + final String scanOperationSpanId = spans.stream() + .filter(scanOperationSpanMatcher::matches) + .map(SpanData::getSpanId) + .findAny() + .orElseThrow(AssertionError::new); + + final Matcher onCompleteMatcher = hasName("TracedAdvancedScanResultConsumer#onError"); + assertThat(spans, hasItem(onCompleteMatcher)); + spans.stream() + .filter(onCompleteMatcher::matches) + .forEach(span -> assertThat(span, allOf( + onCompleteMatcher, + hasParentSpanId(scanOperationSpanId), + hasStatusWithCode(StatusCode.OK), + hasEnded()))); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java new file mode 100644 index 000000000000..702e16a14635 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedAdvancedScanResultConsumer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.trace.TraceUtil; + +/** + * A drop-in replacement for {@link BufferingScanResultConsumer} that adds tracing spans to its + * implementation of the {@link AdvancedScanResultConsumer} API. + */ +public class TracedAdvancedScanResultConsumer implements AdvancedScanResultConsumer { + + private final BufferingScanResultConsumer delegate = new BufferingScanResultConsumer(); + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + TraceUtil.trace( + () -> delegate.onScanMetricsCreated(scanMetrics), + "TracedAdvancedScanResultConsumer#onScanMetricsCreated"); + } + + @Override + public void onNext(Result[] results, ScanController controller) { + TraceUtil.trace( + () -> delegate.onNext(results, controller), + "TracedAdvancedScanResultConsumer#onNext"); + } + + @Override + public void onError(Throwable error) { + TraceUtil.trace( + () -> delegate.onError(error), + "TracedAdvancedScanResultConsumer#onError"); + } + + @Override + public void onComplete() { + TraceUtil.trace(delegate::onComplete, "TracedAdvancedScanResultConsumer#onComplete"); + } + + public Result take() throws IOException, InterruptedException { + return delegate.take(); + } + + public ScanMetrics getScanMetrics() { + return delegate.getScanMetrics(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java new file mode 100644 index 000000000000..0427218038d2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TracedScanResultConsumer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.List; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.trace.TraceUtil; + +/** + * A wrapper over {@link SimpleScanResultConsumer} that adds tracing of spans to its + * implementation. + */ +class TracedScanResultConsumer implements SimpleScanResultConsumer { + + private final SimpleScanResultConsumer delegate; + + public TracedScanResultConsumer(final SimpleScanResultConsumer delegate) { + this.delegate = delegate; + } + + @Override + public void onScanMetricsCreated(ScanMetrics scanMetrics) { + TraceUtil.trace( + () -> delegate.onScanMetricsCreated(scanMetrics), + "TracedScanResultConsumer#onScanMetricsCreated"); + } + + @Override + public boolean onNext(Result result) { + return TraceUtil.trace(() -> delegate.onNext(result), + "TracedScanResultConsumer#onNext"); + } + + @Override + public void onError(Throwable error) { + TraceUtil.trace(() -> delegate.onError(error), "TracedScanResultConsumer#onError"); + } + + @Override + public void onComplete() { + TraceUtil.trace(delegate::onComplete, "TracedScanResultConsumer#onComplete"); + } + + @Override + public List getAll() throws Exception { + return delegate.getAll(); + } + + @Override + public ScanMetrics getScanMetrics() { + return delegate.getScanMetrics(); + } +} diff --git a/pom.xml b/pom.xml index 75b1ae4bc8a4..507caa394469 100755 --- a/pom.xml +++ b/pom.xml @@ -1881,12 +1881,13 @@ -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true -Djava.awt.headless=true -Djdk.net.URLClassPath.disableClassPathURLCheck=true -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced - -Dio.netty.eventLoopThreads=3 + -Dio.netty.eventLoopThreads=3 -Dio.opentelemetry.context.enableStrictContext=true -enableassertions -Xmx${surefire.cygwinXmx} -Djava.security.egd=file:/dev/./urandom -Djava.net.preferIPv4Stack=true "-Djava.library.path=${hadoop.library.path};${java.library.path}" -Dorg.apache.hbase.thirdparty.io.netty.leakDetection.level=advanced + -Dio.opentelemetry.context.enableStrictContext=true ${hbase-surefire.argLine}