Skip to content

Commit

Permalink
feat: MirrorinAsyncTable: scan(), scanAll() (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
prawilny authored Nov 26, 2021
1 parent 33a0903 commit 09cea12
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -49,6 +50,8 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.junit.Assume;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand Down Expand Up @@ -1357,6 +1360,61 @@ public void testResultScanner() throws IOException {
}
}

@Test
public void testScanAll() throws IOException, ExecutionException, InterruptedException {
int databaseEntriesCount = 1000;

TableName tableName = connectionRule.createTable(columnFamily1);
databaseHelpers.fillTable(tableName, databaseEntriesCount, columnFamily1, qualifier1);

try (MirroringAsyncConnection asyncConnection =
asyncConnectionRule.createAsyncConnection(config)) {
assertThat(asyncConnection.getTable(tableName).scanAll(new Scan()).get().size())
.isEqualTo(databaseEntriesCount);
}
}

@Test
public void testBasicScan() throws IOException, ExecutionException, InterruptedException {
int databaseEntriesCount = 1000;

TableName tableName = connectionRule.createTable(columnFamily1);
databaseHelpers.fillTable(tableName, databaseEntriesCount, columnFamily1, qualifier1);

AtomicInteger read = new AtomicInteger(0);
CompletableFuture<Void> scanConsumerEnded = new CompletableFuture<>();

ScanResultConsumer consumer =
new ScanResultConsumer() {
@Override
public boolean onNext(Result result) {
read.incrementAndGet();
return true;
}

@Override
public void onError(Throwable throwable) {
scanConsumerEnded.completeExceptionally(throwable);
}

@Override
public void onComplete() {
scanConsumerEnded.complete(null);
}
};

try (MirroringAsyncConnection asyncConnection =
asyncConnectionRule.createAsyncConnection(config)) {
asyncConnection
.getTableBuilder(tableName, this.executorServiceRule.executorService)
.build()
.scan(new Scan(), consumer);
}
scanConsumerEnded.get();

assertThat(read.get()).isEqualTo(databaseEntriesCount);
}

private void checkIfShouldHaveThrown(CompletableFuture<?> future, byte[] rowKey) {
assertThat(failPredicate.apply(rowKey)).isEqualTo(future.isCompletedExceptionally());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ private <T> OperationStages<T> wrapWithReferenceCounter(OperationStages<T> toBeR
return toBeReferenceCounted;
}

private void keepReferenceUntilOperationCompletes(CompletableFuture<Void> future) {
private <T> void keepReferenceUntilOperationCompletes(CompletableFuture<T> future) {
this.referenceCounter.incrementReferenceCount();
future.whenComplete(
(ignoredResult, ignoredError) -> this.referenceCounter.decrementReferenceCount());
Expand Down Expand Up @@ -506,18 +506,20 @@ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
}

@Override
public Configuration getConfiguration() {
return primaryTable.getConfiguration();
public void scan(Scan scan, C consumer) {
this.primaryTable.scan(scan, consumer);
}

@Override
public void scan(Scan scan, C c) {
throw new UnsupportedOperationException();
public CompletableFuture<List<Result>> scanAll(Scan scan) {
CompletableFuture<List<Result>> result = this.primaryTable.scanAll(scan);
keepReferenceUntilOperationCompletes(result);
return result;
}

@Override
public CompletableFuture<List<Result>> scanAll(Scan scan) {
throw new UnsupportedOperationException();
public Configuration getConfiguration() {
return primaryTable.getConfiguration();
}

private class MirroringCheckAndMutateBuilder implements CheckAndMutateBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.Delete;
Expand All @@ -72,6 +74,7 @@
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.hadoop.hbase.client.ScanResultConsumerBase;
import org.apache.hadoop.hbase.io.TimeRange;
import org.junit.Before;
Expand Down Expand Up @@ -1097,4 +1100,37 @@ public void testGetScanner() {
verify(secondaryTable, times(1)).getScanner(scan);
assertThat(scanner).isInstanceOf(MirroringResultScanner.class);
}

@Test
public void testScanWithScanResultConsumer() {
Scan scan = new Scan();
ScanResultConsumer consumer = mock(ScanResultConsumer.class);
mirroringTable.scan(scan, consumer);

verify(primaryTable, times(1)).scan(eq(scan), any(ScanResultConsumer.class));
verify(secondaryTable, never()).scan(any(Scan.class), any(ScanResultConsumer.class));
}

@Test
public void testScanWithAdvancedScanResultConsumer() {
Scan scan = new Scan();
AdvancedScanResultConsumer consumer = mock(AdvancedScanResultConsumer.class);
mirroringTable.scan(scan, consumer);

verify(primaryTable, times(1)).scan(eq(scan), any(AdvancedScanResultConsumer.class));
verify(secondaryTable, never()).scan(any(Scan.class), any(AdvancedScanResultConsumer.class));
}

@Test
public void testScanAll() {
Scan scan = new Scan();

CompletableFuture<List<Result>> scanAllFuture = new CompletableFuture<>();
when(primaryTable.scanAll(any(Scan.class))).thenReturn(scanAllFuture);

CompletableFuture<List<Result>> results = mirroringTable.scanAll(scan);
verify(primaryTable, times(1)).scanAll(scan);
verify(secondaryTable, never()).scanAll(any(Scan.class));
assertThat(results).isEqualTo(scanAllFuture);
}
}

0 comments on commit 09cea12

Please sign in to comment.