From 369c34f4f47f6ce969bc4f85c98dc76f97ae7a78 Mon Sep 17 00:00:00 2001 From: Adam Czajkowski Date: Tue, 2 Nov 2021 15:30:26 +0100 Subject: [PATCH] feat: MirrorinAsyncTable: scan, scanAll --- .../mirroring/TestMirroringAsyncTable.java | 58 +++++++++++++++++++ .../hbase2_x/MirroringAsyncTable.java | 16 ++--- .../hbase2_x/TestMirroringAsyncTable.java | 36 ++++++++++++ 3 files changed, 103 insertions(+), 7 deletions(-) diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java index 7fce2668f1..fe2d3c09ba 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java @@ -37,6 +37,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.hadoop.conf.Configuration; @@ -49,6 +50,8 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.junit.Assume; import org.junit.ClassRule; import org.junit.Ignore; @@ -1357,6 +1360,61 @@ public void testResultScanner() throws IOException { } } + @Test + public void testScanAll() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + TableName tableName = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName, databaseEntriesCount, columnFamily1, qualifier1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + assertThat(asyncConnection.getTable(tableName).scanAll(new Scan()).get().size()) + .isEqualTo(databaseEntriesCount); + } + } + + @Test + public void testBasicScan() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + TableName tableName = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName, databaseEntriesCount, columnFamily1, qualifier1); + + AtomicInteger read = new AtomicInteger(0); + CompletableFuture scanConsumerEnded = new CompletableFuture<>(); + + ScanResultConsumer consumer = + new ScanResultConsumer() { + @Override + public boolean onNext(Result result) { + read.incrementAndGet(); + return true; + } + + @Override + public void onError(Throwable throwable) { + scanConsumerEnded.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + scanConsumerEnded.complete(null); + } + }; + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + asyncConnection + .getTableBuilder(tableName, this.executorServiceRule.executorService) + .build() + .scan(new Scan(), consumer); + } + scanConsumerEnded.get(); + + assertThat(read.get()).isEqualTo(databaseEntriesCount); + } + private void checkIfShouldHaveThrown(CompletableFuture future, byte[] rowKey) { assertThat(failPredicate.apply(rowKey)).isEqualTo(future.isCompletedExceptionally()); } diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java index de6f4b7ad8..bec2cd2df2 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java @@ -478,7 +478,7 @@ private OperationStages wrapWithReferenceCounter(OperationStages toBeR return toBeReferenceCounted; } - private void keepReferenceUntilOperationCompletes(CompletableFuture future) { + private void keepReferenceUntilOperationCompletes(CompletableFuture future) { this.referenceCounter.incrementReferenceCount(); future.whenComplete( (ignoredResult, ignoredError) -> this.referenceCounter.decrementReferenceCount()); @@ -506,18 +506,20 @@ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) { } @Override - public Configuration getConfiguration() { - return primaryTable.getConfiguration(); + public void scan(Scan scan, C consumer) { + this.primaryTable.scan(scan, consumer); } @Override - public void scan(Scan scan, C c) { - throw new UnsupportedOperationException(); + public CompletableFuture> scanAll(Scan scan) { + CompletableFuture> result = this.primaryTable.scanAll(scan); + keepReferenceUntilOperationCompletes(result); + return result; } @Override - public CompletableFuture> scanAll(Scan scan) { - throw new UnsupportedOperationException(); + public Configuration getConfiguration() { + return primaryTable.getConfiguration(); } private class MirroringCheckAndMutateBuilder implements CheckAndMutateBuilder { diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java index 6a142bf0c2..2b0e0f96ce 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java @@ -27,6 +27,7 @@ import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -59,6 +60,7 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.Delete; @@ -72,6 +74,7 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.ScanResultConsumer; import org.apache.hadoop.hbase.client.ScanResultConsumerBase; import org.apache.hadoop.hbase.io.TimeRange; import org.junit.Before; @@ -1097,4 +1100,37 @@ public void testGetScanner() { verify(secondaryTable, times(1)).getScanner(scan); assertThat(scanner).isInstanceOf(MirroringResultScanner.class); } + + @Test + public void testScanWithScanResultConsumer() { + Scan scan = new Scan(); + ScanResultConsumer consumer = mock(ScanResultConsumer.class); + mirroringTable.scan(scan, consumer); + + verify(primaryTable, times(1)).scan(eq(scan), any(ScanResultConsumer.class)); + verify(secondaryTable, never()).scan(any(Scan.class), any(ScanResultConsumer.class)); + } + + @Test + public void testScanWithAdvancedScanResultConsumer() { + Scan scan = new Scan(); + AdvancedScanResultConsumer consumer = mock(AdvancedScanResultConsumer.class); + mirroringTable.scan(scan, consumer); + + verify(primaryTable, times(1)).scan(eq(scan), any(AdvancedScanResultConsumer.class)); + verify(secondaryTable, never()).scan(any(Scan.class), any(AdvancedScanResultConsumer.class)); + } + + @Test + public void testScanAll() { + Scan scan = new Scan(); + + CompletableFuture> scanAllFuture = new CompletableFuture<>(); + when(primaryTable.scanAll(any(Scan.class))).thenReturn(scanAllFuture); + + CompletableFuture> results = mirroringTable.scanAll(scan); + verify(primaryTable, times(1)).scanAll(scan); + verify(secondaryTable, never()).scanAll(any(Scan.class)); + assertThat(results).isEqualTo(scanAllFuture); + } }