From 6950f05d9b1178f49d49babbf3e00cc9cb696bce Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Thu, 14 Oct 2021 14:07:53 +0200 Subject: [PATCH] feat: concurrent writes in MirroringTable --- .../hbase1_x/MirroringConnection.java | 3 + .../mirroring/hbase1_x/MirroringOptions.java | 4 + .../hbase1_x/MirroringResultScanner.java | 2 +- .../mirroring/hbase1_x/MirroringTable.java | 289 +++++++--- .../utils/MirroringConfigurationHelper.java | 9 + .../hbase1_x/utils/RequestScheduling.java | 67 +-- .../mirroring/hbase1_x/TestHelpers.java | 74 +++ .../hbase1_x/TestMirroringMetrics.java | 20 +- .../hbase1_x/TestMirroringTable.java | 526 ++++++++++++------ .../TestMirroringTableInputModification.java | 1 + .../hbase1_x/TestVerificationSampling.java | 1 + 11 files changed, 725 insertions(+), 271 deletions(-) diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java index a661e89b05..f2ac331b3a 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java @@ -64,6 +64,7 @@ public class MirroringConnection implements Connection { private final Connection secondaryConnection; private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean aborted = new AtomicBoolean(false); + private final boolean performWritesConcurrently; /** * The constructor called from {@link @@ -117,6 +118,7 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService new SecondaryWriteErrorConsumerWithMetrics( this.mirroringTracer, secondaryWriteErrorConsumer); this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate); + this.performWritesConcurrently = this.configuration.mirroringOptions.performWritesConcurrently; } @Override @@ -154,6 +156,7 @@ public Table call() throws IOException { this.flowController, this.secondaryWriteErrorConsumer, this.readSampler, + this.performWritesConcurrently, this.mirroringTracer); this.referenceCounter.holdReferenceUntilClosing(table); return table; diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java index e71a07c3e6..6968c1f162 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.mirroring.hbase1_x; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH; +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_STRATEGY_CLASS; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_MISMATCH_DETECTOR_CLASS; @@ -46,6 +47,8 @@ public class MirroringOptions { public final String writeErrorLogAppenderClass; public final String writeErrorLogSerializerClass; + public final boolean performWritesConcurrently; + public MirroringOptions(Configuration configuration) { this.mismatchDetectorClass = configuration.get( @@ -76,5 +79,6 @@ public MirroringOptions(Configuration configuration) { this.writeErrorLogSerializerClass = configuration.get( MIRRORING_WRITE_ERROR_LOG_SERIALIZER_CLASS, DefaultSerializer.class.getCanonicalName()); + this.performWritesConcurrently = configuration.getBoolean(MIRRORING_CONCURRENT_WRITES, false); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java index 079f89419f..714e592334 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringResultScanner.java @@ -220,7 +220,7 @@ private void scheduleRequest( return; } this.listenableReferenceCounter.holdReferenceUntilCompletion( - RequestScheduling.scheduleVerificationAndRequestWithFlowControl( + RequestScheduling.scheduleRequestAndVerificationWithFlowControl( requestResourcesDescription, nextSupplier, this.mirroringTracer.spanFactory.wrapReadVerificationCallback(scannerNext), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java index 3cebf296f0..208af4a6e8 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java @@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; @@ -110,7 +111,7 @@ public boolean apply(@NullableDecl Object o) { private final MirroringTracer mirroringTracer; private final ReadSampler readSampler; - + private final boolean performWritesConcurrently; /** * @param executorService ExecutorService is used to perform operations on secondaryTable and * verification tasks. @@ -126,6 +127,7 @@ public MirroringTable( FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, ReadSampler readSampler, + boolean performWritesConcurrently, MirroringTracer mirroringTracer) { this.primaryTable = primaryTable; this.secondaryTable = secondaryTable; @@ -140,6 +142,7 @@ public MirroringTable( this.referenceCounter = new ListenableReferenceCounter(); this.referenceCounter.holdReferenceUntilClosing(this.secondaryAsyncWrapper); this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer; + this.performWritesConcurrently = performWritesConcurrently; this.mirroringTracer = mirroringTracer; } @@ -192,7 +195,7 @@ public Boolean call() throws IOException { HBaseOperation.EXISTS); if (this.readSampler.shouldNextReadOperationBeSampled()) { - scheduleVerificationAndRequestWithFlowControl( + scheduleSequentialReadOperationWithVerification( new RequestResourcesDescription(result), this.secondaryAsyncWrapper.exists(get), this.verificationContinuationFactory.exists(get, result)); @@ -218,7 +221,7 @@ public boolean[] call() throws IOException { HBaseOperation.EXISTS_ALL); if (this.readSampler.shouldNextReadOperationBeSampled()) { - scheduleVerificationAndRequestWithFlowControl( + scheduleSequentialReadOperationWithVerification( new RequestResourcesDescription(result), this.secondaryAsyncWrapper.existsAll(list), this.verificationContinuationFactory.existsAll(list, result)); @@ -227,25 +230,6 @@ public boolean[] call() throws IOException { } } - private void batchWithSpan(final List inputOperations, final Object[] results) - throws IOException, InterruptedException { - final List operations = new ArrayList<>(inputOperations); - Log.trace("[%s] batch(operations=%s, results)", this.getName(), operations); - try { - this.mirroringTracer.spanFactory.wrapPrimaryOperation( - new CallableThrowingIOAndInterruptedException() { - @Override - public Void call() throws IOException, InterruptedException { - MirroringTable.this.primaryTable.batch(operations, results); - return null; - } - }, - HBaseOperation.BATCH); - } finally { - scheduleSecondaryWriteBatchOperations(operations, results); - } - } - @Override public void batch(List operations, Object[] results) throws IOException, InterruptedException { @@ -264,7 +248,7 @@ public Object[] batch(List operations) throws IOException, Interr @Override public void batchCallback( - List inputOperations, Object[] results, Callback callback) + List inputOperations, Object[] results, final Callback callback) throws IOException, InterruptedException { final List operations = new ArrayList<>(inputOperations); try (Scope scope = @@ -272,11 +256,17 @@ public void batchCallback( Log.trace( "[%s] batchCallback(operations=%s, results, callback=%s)", this.getName(), operations, callback); - try { - this.primaryTable.batchCallback(operations, results, callback); - } finally { - scheduleSecondaryWriteBatchOperations(operations, results); - } + + batchWithSpan( + operations, + results, + new BatchOperation() { + @Override + public void perform(List operations, Object[] results) + throws IOException, InterruptedException { + primaryTable.batchCallback(operations, results, callback); + } + }); } } @@ -306,12 +296,11 @@ public Result call() throws IOException { HBaseOperation.GET); if (this.readSampler.shouldNextReadOperationBeSampled()) { - scheduleVerificationAndRequestWithFlowControl( + scheduleSequentialReadOperationWithVerification( new RequestResourcesDescription(result), this.secondaryAsyncWrapper.get(get), this.verificationContinuationFactory.get(get, result)); } - return result; } } @@ -333,7 +322,7 @@ public Result[] call() throws IOException { HBaseOperation.GET_LIST); if (this.readSampler.shouldNextReadOperationBeSampled()) { - scheduleVerificationAndRequestWithFlowControl( + scheduleSequentialReadOperationWithVerification( new RequestResourcesDescription(result), this.secondaryAsyncWrapper.get(list), this.verificationContinuationFactory.get(list, result)); @@ -423,18 +412,7 @@ public Void call() throws IOException { public void put(final Put put) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.PUT)) { Log.trace("[%s] put(put=%s)", this.getName(), put); - - this.mirroringTracer.spanFactory.wrapPrimaryOperation( - new CallableThrowingIOException() { - @Override - public Void call() throws IOException { - MirroringTable.this.primaryTable.put(put); - return null; - } - }, - HBaseOperation.PUT); - scheduleWriteWithControlFlow( - new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); + this.batchSingleWriteOperation(put); } } @@ -481,17 +459,7 @@ public boolean checkAndPut( public void delete(final Delete delete) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.DELETE)) { Log.trace("[%s] delete(delete=%s)", this.getName(), delete); - this.mirroringTracer.spanFactory.wrapPrimaryOperation( - new CallableThrowingIOException() { - @Override - public Void call() throws IOException { - MirroringTable.this.primaryTable.delete(delete); - return null; - } - }, - HBaseOperation.DELETE); - scheduleWriteWithControlFlow( - new WriteOperationInfo(delete), this.secondaryAsyncWrapper.delete(delete)); + this.batchSingleWriteOperation(delete); } } @@ -546,19 +514,7 @@ public boolean checkAndDelete( public void mutateRow(final RowMutations rowMutations) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.MUTATE_ROW)) { Log.trace("[%s] mutateRow(rowMutations=%s)", this.getName(), rowMutations); - - this.mirroringTracer.spanFactory.wrapPrimaryOperation( - new CallableThrowingIOException() { - @Override - public Void call() throws IOException { - MirroringTable.this.primaryTable.mutateRow(rowMutations); - return null; - } - }, - HBaseOperation.MUTATE_ROW); - - scheduleWriteWithControlFlow( - new WriteOperationInfo(rowMutations), this.secondaryAsyncWrapper.mutateRow(rowMutations)); + batchSingleWriteOperation(rowMutations); } } @@ -579,7 +535,7 @@ public Result call() throws IOException { Put put = makePutFromResult(result); - scheduleWriteWithControlFlow( + scheduleSequentialWriteOperation( new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); return result; } @@ -602,7 +558,7 @@ public Result call() throws IOException { Put put = makePutFromResult(result); - scheduleWriteWithControlFlow( + scheduleSequentialWriteOperation( new WriteOperationInfo(put), this.secondaryAsyncWrapper.put(put)); return result; } @@ -702,7 +658,7 @@ public Boolean call() throws IOException { HBaseOperation.CHECK_AND_MUTATE); if (wereMutationsApplied) { - scheduleWriteWithControlFlow( + scheduleSequentialWriteOperation( new WriteOperationInfo(rowMutations), this.secondaryAsyncWrapper.mutateRow(rowMutations)); } return wereMutationsApplied; @@ -774,12 +730,12 @@ public void addOnCloseListener(Runnable listener) { .addListener(listener, MoreExecutors.directExecutor()); } - private void scheduleVerificationAndRequestWithFlowControl( + private void scheduleSequentialReadOperationWithVerification( final RequestResourcesDescription resultInfo, final Supplier> secondaryGetFutureSupplier, final FutureCallback verificationCallback) { this.referenceCounter.holdReferenceUntilCompletion( - RequestScheduling.scheduleVerificationAndRequestWithFlowControl( + RequestScheduling.scheduleRequestAndVerificationWithFlowControl( resultInfo, secondaryGetFutureSupplier, this.mirroringTracer.spanFactory.wrapReadVerificationCallback(verificationCallback), @@ -787,7 +743,7 @@ private void scheduleVerificationAndRequestWithFlowControl( this.mirroringTracer)); } - private void scheduleWriteWithControlFlow( + private void scheduleSequentialWriteOperation( final WriteOperationInfo writeOperationInfo, final Supplier> secondaryResultFutureSupplier) { final FlowController flowController = this.flowController; @@ -801,7 +757,7 @@ public void onFailure(Throwable throwable) { }; this.referenceCounter.holdReferenceUntilCompletion( - RequestScheduling.scheduleVerificationAndRequestWithFlowControl( + RequestScheduling.scheduleRequestAndVerificationWithFlowControl( writeOperationInfo.requestResourcesDescription, secondaryResultFutureSupplier, this.mirroringTracer.spanFactory.wrapWriteOperationCallback(writeErrorCallback), @@ -817,6 +773,189 @@ public Void apply(Throwable throwable) { })); } + private void batchSingleWriteOperation(Row operation) throws IOException { + Object[] results = new Object[1]; + try { + batchWithSpan(Collections.singletonList(operation), results); + } catch (RetriesExhaustedWithDetailsException e) { + if (e.getCause(0) instanceof IOException) { + throw (IOException) e.getCause(0); + } + throw new IOException(e.getCause(0)); + } catch (InterruptedException e) { + InterruptedIOException interruptedIOException = new InterruptedIOException(); + interruptedIOException.initCause(e); + throw interruptedIOException; + } + } + + interface BatchOperation { + void perform(List operations, Object[] results) + throws IOException, InterruptedException; + } + + private void batchWithSpan(final List inputOperations, final Object[] results) + throws IOException, InterruptedException { + batchWithSpan( + inputOperations, + results, + new BatchOperation() { + @Override + public void perform(List operations, Object[] results) + throws IOException, InterruptedException { + MirroringTable.this.primaryTable.batch(operations, results); + } + }); + } + + private void batchWithSpan( + final List inputOperations, + final Object[] results, + final BatchOperation operation) + throws IOException, InterruptedException { + final List operations = new ArrayList<>(inputOperations); + Log.trace("[%s] batch(operations=%s, results)", this.getName(), operations); + + final Object[] internalPrimaryResults = new Object[results.length]; + + CallableThrowingIOAndInterruptedException primaryOperation = + new CallableThrowingIOAndInterruptedException() { + @Override + public Void call() throws IOException, InterruptedException { + operation.perform(operations, internalPrimaryResults); + return null; + } + }; + + try { + if (!this.performWritesConcurrently || !canBatchBePerformedConcurrently(operations)) { + sequentialBatch(internalPrimaryResults, operations, primaryOperation); + } else { + concurrentBatch(internalPrimaryResults, operations, primaryOperation); + } + } finally { + System.arraycopy(internalPrimaryResults, 0, results, 0, results.length); + } + } + + private boolean canBatchBePerformedConcurrently(List operations) { + // Only Puts and Deletes can be performed concurrently. + // We assume that RowMutations can consist of only Puts and Deletes (which is true in HBase 1.x + // and 2.x). + for (Row operation : operations) { + if (!(operation instanceof Put) + && !(operation instanceof Delete) + && !(operation instanceof RowMutations)) { + return false; + } + } + return true; + } + + private void sequentialBatch( + Object[] results, + List operations, + CallableThrowingIOAndInterruptedException primaryOperation) + throws IOException, InterruptedException { + try { + this.mirroringTracer.spanFactory.wrapPrimaryOperation(primaryOperation, HBaseOperation.BATCH); + } finally { + scheduleSecondaryWriteBatchOperations(operations, results); + } + } + + private void concurrentBatch( + final Object[] primaryResults, + final List operations, + final CallableThrowingIOAndInterruptedException primaryOperation) + throws IOException, InterruptedException { + RequestResourcesDescription requestResourcesDescription = + new RequestResourcesDescription(operations, new Result[0]); + final Object[] secondaryResults = new Object[operations.size()]; + final Throwable[] primaryException = new Throwable[1]; + final Throwable[] flowControllerException = new Throwable[1]; + + // After the flow control resources have been obtained, we will schedule secondary operation and + // then run primary operation. + final Supplier> invokeBothOperations = + new Supplier>() { + @Override + public ListenableFuture get() { + // We are scheduling secondary batch to run concurrently. + ListenableFuture secondaryOperationEnded = + secondaryAsyncWrapper.batch(operations, secondaryResults).get(); + // Primary operation is then performed synchronously. + try { + primaryOperation.call(); + } catch (IOException | InterruptedException e) { + primaryException[0] = e; + } + // Primary operation has ended and its results are available to the user. + + // We want the schedule verification to after the secondary operation. + return secondaryOperationEnded; + } + }; + + FutureCallback verification = + new FutureCallback() { + private void verify() { + int numResults = primaryResults.length; + for (int i = 0; i < numResults; i++) { + Object primaryResult = primaryResults[i]; + Object secondaryResult = secondaryResults[i]; + boolean primaryFailure = resultIsFaultyPredicate.apply(primaryResult); + boolean secondaryFailure = resultIsFaultyPredicate.apply(secondaryResult); + // Primary errors will be reported directly to the user. + if (secondaryFailure && !primaryFailure) { + Throwable exception = secondaryResult == null ? null : (Throwable) secondaryResult; + + secondaryWriteErrorConsumer.consume( + HBaseOperation.BATCH, operations.get(i), exception); + } + } + } + + @Override + public void onSuccess(@NullableDecl Void result) { + verify(); + } + + @Override + public void onFailure(Throwable throwable) { + verify(); + } + }; + + this.referenceCounter.holdReferenceUntilCompletion( + RequestScheduling.scheduleRequestAndVerificationWithFlowControl( + requestResourcesDescription, + invokeBothOperations, + verification, + this.flowController, + this.mirroringTracer, + new Function() { + @NullableDecl + @Override + public Void apply(@NullableDecl Throwable throwable) { + flowControllerException[0] = throwable; + return null; + } + })); + + if (flowControllerException[0] != null) { + throw new IOException("FlowController rejected the request", flowControllerException[0]); + } + + if (primaryException[0] != null) { + if (primaryException[0] instanceof InterruptedException) { + throw (InterruptedException) primaryException[0]; + } else { + throw (IOException) primaryException[0]; + } + } + } + private void scheduleSecondaryWriteBatchOperations( final List operations, final Object[] results) { @@ -866,7 +1005,7 @@ public Void apply(Throwable throwable) { }; this.referenceCounter.holdReferenceUntilCompletion( - RequestScheduling.scheduleVerificationAndRequestWithFlowControl( + RequestScheduling.scheduleRequestAndVerificationWithFlowControl( requestResourcesDescription, this.secondaryAsyncWrapper.batch(operationsToScheduleOnSecondary, resultsSecondary), verificationFuture, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java index bd93a83b73..5a0a2fe534 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java @@ -119,6 +119,15 @@ public class MirroringConfigurationHelper { public static final String MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH = "google.bigtable.mirroring.buffered-mutator.bytes-to-flush"; + /** + * When set to {@code true} writes to primary and secondary databases will be performed + * concurrently. This option reduces write-latency to secondary database, but can cause additional + * inconsistency - some writes might complete successfully on secondary database and fail on + * primary database. + */ + public static final String MIRRORING_CONCURRENT_WRITES = + "google.bigtable.mirroring.concurrent-writes"; + public static void fillConnectionConfigWithClassImplementation( Configuration connectionConfig, Configuration config, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java index b298efd703..0c5f4fff64 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/RequestScheduling.java @@ -38,13 +38,13 @@ public class RequestScheduling { private static final Logger Log = new Logger(RequestScheduling.class); - public static ListenableFuture scheduleVerificationAndRequestWithFlowControl( + public static ListenableFuture scheduleRequestAndVerificationWithFlowControl( final RequestResourcesDescription requestResourcesDescription, final Supplier> secondaryResultFutureSupplier, final FutureCallback verificationCallback, final FlowController flowController, final MirroringTracer mirroringTracer) { - return scheduleVerificationAndRequestWithFlowControl( + return scheduleRequestAndVerificationWithFlowControl( requestResourcesDescription, secondaryResultFutureSupplier, verificationCallback, @@ -58,9 +58,9 @@ public Void apply(Throwable t) { }); } - public static ListenableFuture scheduleVerificationAndRequestWithFlowControl( + public static ListenableFuture scheduleRequestAndVerificationWithFlowControl( final RequestResourcesDescription requestResourcesDescription, - final Supplier> secondaryResultFutureSupplier, + final Supplier> invokeOperation, final FutureCallback verificationCallback, final FlowController flowController, final MirroringTracer mirroringTracer, @@ -69,38 +69,11 @@ public static ListenableFuture scheduleVerificationAndRequestWithFlowC final ListenableFuture reservationRequest = flowController.asyncRequestResource(requestResourcesDescription); + final ResourceReservation reservation; try { - final ResourceReservation reservation; try (Scope scope = mirroringTracer.spanFactory.flowControlScope()) { reservation = reservationRequest.get(); } - Futures.addCallback( - secondaryResultFutureSupplier.get(), - mirroringTracer.spanFactory.wrapWithCurrentSpan( - new FutureCallback() { - @Override - public void onSuccess(@NullableDecl T t) { - try { - Log.trace("starting verification %s", t); - verificationCallback.onSuccess(t); - Log.trace("verification done %s", t); - } finally { - reservation.release(); - verificationCompletedFuture.set(null); - } - } - - @Override - public void onFailure(Throwable throwable) { - try { - verificationCallback.onFailure(throwable); - } finally { - reservation.release(); - verificationCompletedFuture.set(null); - } - } - }), - MoreExecutors.directExecutor()); } catch (InterruptedException | ExecutionException e) { flowControlReservationErrorConsumer.apply(e); FlowController.cancelRequest(reservationRequest); @@ -110,7 +83,37 @@ public void onFailure(Throwable throwable) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } + return verificationCompletedFuture; } + + Futures.addCallback( + invokeOperation.get(), + mirroringTracer.spanFactory.wrapWithCurrentSpan( + new FutureCallback() { + @Override + public void onSuccess(@NullableDecl T t) { + try { + Log.trace("starting verification %s", t); + verificationCallback.onSuccess(t); + Log.trace("verification done %s", t); + } finally { + reservation.release(); + verificationCompletedFuture.set(null); + } + } + + @Override + public void onFailure(Throwable throwable) { + try { + verificationCallback.onFailure(throwable); + } finally { + reservation.release(); + verificationCompletedFuture.set(null); + } + } + }), + MoreExecutors.directExecutor()); + return verificationCompletedFuture; } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestHelpers.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestHelpers.java index 16c0fb86c4..868febd1d1 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestHelpers.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestHelpers.java @@ -29,14 +29,20 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Table; +import org.mockito.ArgumentMatchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -92,6 +98,10 @@ public static List createGets(String... keys) { return result; } + public static Delete createDelete(String row) { + return new Delete(row.getBytes()); + } + public static Cell createCell( String row, String family, String qualifier, long timestamp, Type type, String value) { return CellUtil.createCell( @@ -155,4 +165,68 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { }); return secondaryOperationAllowedFuture; } + + public static Map mapOf(Object... keyValuePairs) { + assert keyValuePairs.length % 2 == 0; + Map mapping = new HashMap<>(); + for (int i = 0; i < keyValuePairs.length; i += 2) { + mapping.put(keyValuePairs[i], keyValuePairs[i + 1]); + } + return mapping; + } + + public static void mockBatch(Table table1, Table table2, Object... keyValuePairs) + throws IOException, InterruptedException { + mockBatch(table1, keyValuePairs); + mockBatch(table2, keyValuePairs); + } + + public static void mockBatch(Table table, Object... keyValuePairs) + throws IOException, InterruptedException { + + lenient() + .doAnswer(createMockBatchAnswer(keyValuePairs)) + .when(table) + .batch(ArgumentMatchers.anyList(), any(Object[].class)); + } + + public static Answer createMockBatchAnswer(final Object... keyValuePairs) { + final Map mapping = mapOf(keyValuePairs); + + return new Answer() { + @Override + public Void answer(InvocationOnMock invocationOnMock) throws Throwable { + boolean shouldThrow = false; + Object[] args = invocationOnMock.getArguments(); + List operations = (List) args[0]; + Object[] result = (Object[]) args[1]; + + for (int i = 0; i < operations.size(); i++) { + Row operation = operations.get(i); + if (mapping.containsKey(operation)) { + Object value = mapping.get(operation); + result[i] = value; + if (value instanceof Throwable) { + shouldThrow = true; + } + } else if (operation instanceof Get) { + Get get = (Get) operation; + result[i] = createResult(get.getRow(), get.getRow()); + } else { + result[i] = Result.create(new Cell[0]); + } + } + if (shouldThrow) { + throw new IOException(); + } + return null; + } + }; + } + + public static ArrayList asArrayList(T... entries) { + ArrayList arrayList = new ArrayList<>(); + arrayList.addAll(Arrays.asList(entries)); + return arrayList; + } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java index 454ae3a900..fb6f8c2296 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java @@ -18,6 +18,7 @@ import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createGet; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createPut; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createResult; +import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.mockBatch; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.setupFlowControllerMock; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.MIRRORING_LATENCY; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.PRIMARY_ERRORS; @@ -31,8 +32,6 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -107,6 +106,7 @@ public void setUp() { new SecondaryWriteErrorConsumerWithMetrics( tracer, mock(SecondaryWriteErrorConsumer.class)), new ReadSampler(100), + false, tracer)); } @@ -230,22 +230,26 @@ public void testSecondaryErrorMetricIsRecorded() throws IOException { } @Test - public void testSingleWriteErrorMetricIsRecorded() throws IOException { + public void testSingleWriteErrorMetricIsRecorded() throws IOException, InterruptedException { Put put = createPut("test", "f1", "q1", "v1"); - doNothing().when(primaryTable).put(put); - doThrow(new IOException("test exception")).when(secondaryTable).put(put); + mockBatch(primaryTable, put, new Result()); + mockBatch(secondaryTable, put, new IOException("test exception")); mirroringTable.put(put); executorServiceRule.waitForExecutor(); verify(mirroringMetricsRecorder, times(1)) .recordOperation( - eq(HBaseOperation.PUT), eq(PRIMARY_LATENCY), anyLong(), eq(PRIMARY_ERRORS), eq(false)); + eq(HBaseOperation.BATCH), + eq(PRIMARY_LATENCY), + anyLong(), + eq(PRIMARY_ERRORS), + eq(false)); verify(mirroringMetricsRecorder, times(1)) .recordOperation( - eq(HBaseOperation.PUT), + eq(HBaseOperation.BATCH), eq(SECONDARY_LATENCY), anyLong(), eq(SECONDARY_ERRORS), @@ -256,7 +260,7 @@ public void testSingleWriteErrorMetricIsRecorded() throws IOException { verify(mirroringMetricsRecorder, never()) .recordReadMismatches(any(HBaseOperation.class), anyInt()); - verify(mirroringMetricsRecorder, times(1)).recordWriteMismatches(HBaseOperation.PUT, 1); + verify(mirroringMetricsRecorder, times(1)).recordWriteMismatches(HBaseOperation.BATCH, 1); } @Test diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java index 11a52cf56b..ea311a3359 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java @@ -15,10 +15,13 @@ */ package com.google.cloud.bigtable.mirroring.hbase1_x; +import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createDelete; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createGet; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createGets; +import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createMockBatchAnswer; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createPut; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.createResult; +import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.mockBatch; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.setupFlowControllerMock; import static com.google.cloud.bigtable.mirroring.hbase1_x.TestHelpers.setupFlowControllerToRejectRequests; import static com.google.common.truth.Truth.assertThat; @@ -29,7 +32,6 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -42,20 +44,24 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.ReadSampler; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.SecondaryWriteErrorConsumerWithMetrics; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; @@ -82,7 +88,9 @@ import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; +import org.mockito.InOrder; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -117,6 +125,7 @@ public void setUp() { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + false, new MirroringTracer())); } @@ -541,29 +550,27 @@ public void run() { @Test public void testPutIsMirrored() throws IOException, InterruptedException { Put put = createPut("test", "f1", "q1", "v1"); - List puts = Arrays.asList(put); - - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] args = invocationOnMock.getArguments(); - Object[] result = (Object[]) args[1]; + List puts = new ArrayList<>(); + puts.add(put); - // secondary - result[0] = Result.create(new Cell[0]); - return null; - } - }) - .when(primaryTable) - .batch(eq(puts), any(Object[].class)); + mockBatch(primaryTable, put, new Result()); mirroringTable.put(put); - mirroringTable.put(puts); executorServiceRule.waitForExecutor(); - verify(primaryTable, times(1)).put(put); - verify(secondaryTable, times(1)).put(put); + verify(primaryTable, times(1)).batch(eq(puts), any(Object[].class)); + verify(secondaryTable, times(1)).batch(eq(puts), any(Object[].class)); + } + + @Test + public void testPutListIsMirrored() throws IOException, InterruptedException { + Put put = createPut("test", "f1", "q1", "v1"); + List puts = Arrays.asList(put); + + mockBatch(primaryTable, put, new Result()); + + mirroringTable.put(puts); + executorServiceRule.waitForExecutor(); // put(List) is mirrored using batch, we because we have to detect partially applied // writes. @@ -572,9 +579,9 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable { } @Test - public void testPutWithErrorIsNotMirrored() throws IOException { + public void testPutWithErrorIsNotMirrored() throws IOException, InterruptedException { final Put put = createPut("test", "f1", "q1", "v1"); - doThrow(new IOException("test exception")).when(primaryTable).put(put); + mockBatch(this.primaryTable, put, new IOException("test")); assertThrows( IOException.class, @@ -586,30 +593,25 @@ public void run() throws Throwable { }); executorServiceRule.waitForExecutor(); - verify(primaryTable, times(1)).put(put); - verify(secondaryTable, times(0)).put(put); + verify(primaryTable, times(1)).batch(eq(Arrays.asList(put)), any(Object[].class)); + verify(secondaryTable, never()).batch(ArgumentMatchers.anyList(), any(Object[].class)); } @Test - public void testPutWithSecondaryErrorCallsErrorHandler() throws IOException { + public void testPutWithSecondaryErrorCallsErrorHandler() + throws IOException, InterruptedException { Put put = createPut("test", "f1", "q1", "v1"); - doThrow(new IOException("test exception")).when(secondaryTable).put(put); - doNothing().when(primaryTable).put(put); + mockBatch(primaryTable); + mockBatch(secondaryTable, put, new IOException("test exception")); mirroringTable.put(put); executorServiceRule.waitForExecutor(); - verify(primaryTable, times(1)).put(put); - verify(secondaryTable, times(1)).put(put); + verify(primaryTable, times(1)).batch(eq(Collections.singletonList(put)), any(Object[].class)); + verify(secondaryTable, times(1)).batch(eq(Collections.singletonList(put)), any(Object[].class)); - ArgumentCaptor argument1 = ArgumentCaptor.forClass(HBaseOperation.class); - ArgumentCaptor> argument2 = ArgumentCaptor.forClass(List.class); verify(secondaryWriteErrorConsumer, times(1)) - .consume(argument1.capture(), argument2.capture(), any(Throwable.class)); - assertThat(argument2.getValue().size()).isEqualTo(1); - assertThat(argument2.getValue().get(0)).isEqualTo(put); - - assertThat(argument1.getValue()).isEqualTo(HBaseOperation.PUT); + .consume(eq(HBaseOperation.BATCH), eq(put), any(Throwable.class)); } @Test @@ -625,31 +627,13 @@ public void testBatchGetAndPutGetsAreVerifiedOnSuccess() // get1 | ok | ok final Result get1Result = createResult("get1", "value1"); - final Result get3Result = createResult("get3", "value3"); // primary Object[] results = new Object[2]; - results[0] = Result.create(new Cell[0]); - results[1] = get1Result; List secondaryRequests = Arrays.asList(new Row[] {put1, get1}); - doNothing().when(primaryTable).batch(ArgumentMatchers.anyList(), any(Object[].class)); - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] args = invocationOnMock.getArguments(); - Object[] result = (Object[]) args[1]; - - // secondary - result[0] = Result.create(new Cell[0]); - result[1] = get1Result; - return null; - } - }) - .when(secondaryTable) - .batch(eq(secondaryRequests), any(Object[].class)); + mockBatch(primaryTable, secondaryTable, put1, Result.create(new Cell[0]), get1, get1Result); mirroringTable.batch(requests, results); executorServiceRule.waitForExecutor(); @@ -689,41 +673,38 @@ public void testBatchGetAndPut() throws IOException, InterruptedException { // primary Object[] results = new Object[6]; - results[0] = Result.create(new Cell[0]); // put1 - ok - results[1] = null; // put2 - failed - results[2] = Result.create(new Cell[0]); // put3 - ok - results[3] = get1Result; // get1 - ok - results[4] = null; // get2 - fail - results[5] = get3Result; // get3 - ok + mockBatch( + primaryTable, + put1, + Result.create(new Cell[0]), + put2, + new IOException("test1"), + put3, + Result.create(new Cell[0]), + get1, + get1Result, + get2, + new IOException("test1"), + get3, + get3Result); List secondaryRequests = Arrays.asList(new Row[] {put1, put3, get1, get3}); - doThrow(new IOException("test1")) - .when(primaryTable) - .batch(ArgumentMatchers.anyList(), any(Object[].class)); - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] args = invocationOnMock.getArguments(); - Object[] result = (Object[]) args[1]; - - // secondary - result[0] = null; // put1 failed on secondary - result[1] = Result.create(new Cell[0]); // put3 ok on secondary - result[2] = new IOException("test"); // get1 - failed on secondary - result[3] = get3Result; // get3 - ok - throw new IOException("test2"); - } - }) - .when(secondaryTable) - .batch(eq(secondaryRequests), any(Object[].class)); + mockBatch( + secondaryTable, + put1, + null, + put3, + Result.create(new Cell[0]), + get1, + new IOException("test"), + get3, + get3Result); try { mirroringTable.batch(requests, results); fail("should have thrown"); - } catch (IOException e) { - assertThat(e).hasMessageThat().contains("test1"); + } catch (IOException ignored) { } executorServiceRule.waitForExecutor(); @@ -758,35 +739,17 @@ public void testBatchGetsPrimaryFailsSecondaryOk() throws IOException, Interrupt final Result get2Result = createResult("get2", "value2"); // primary - Object[] results = new Object[6]; - results[0] = null; - results[1] = get2Result; + Object[] results = new Object[2]; List secondaryRequests = Arrays.asList(new Row[] {get2}); - doThrow(new IOException("test1")) - .when(primaryTable) - .batch(ArgumentMatchers.anyList(), any(Object[].class)); - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] args = invocationOnMock.getArguments(); - Object[] result = (Object[]) args[1]; - - // secondary - result[0] = get2Result; - return null; - } - }) - .when(secondaryTable) - .batch(eq(secondaryRequests), any(Object[].class)); + mockBatch(primaryTable, get1, new IOException("test"), get2, get2Result); + mockBatch(secondaryTable, get2, get2Result); try { mirroringTable.batch(requests, results); fail("should have thrown"); - } catch (IOException e) { - assertThat(e).hasMessageThat().contains("test1"); + } catch (IOException ignored) { } executorServiceRule.waitForExecutor(); @@ -923,38 +886,41 @@ public void testCheckAndMutate() throws IOException { @Test public void testDelete() throws IOException, InterruptedException { Delete delete = new Delete("r1".getBytes()); + + mockBatch(primaryTable, secondaryTable, delete, new Result()); + mirroringTable.delete(delete); + executorServiceRule.waitForExecutor(); + verify(primaryTable, times(1)).batch(eq(Arrays.asList(delete)), any(Object[].class)); + verify(secondaryTable, times(1)).batch(eq(Arrays.asList(delete)), any(Object[].class)); + } + + @Test + public void testDeleteList() throws IOException, InterruptedException { List deletes = new ArrayList<>(); - deletes.add(delete); + deletes.add(new Delete("r1".getBytes())); - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] args = invocationOnMock.getArguments(); - Object[] result = (Object[]) args[1]; + List originalDeletes = new ArrayList<>(deletes); - // secondary - result[0] = Result.create(new Cell[0]); - return null; - } - }) - .when(primaryTable) - .batch(eq(deletes), any(Object[].class)); + mockBatch(primaryTable, secondaryTable, deletes.get(0), new Result()); mirroringTable.delete(deletes); + assertThat(deletes).isEmpty(); executorServiceRule.waitForExecutor(); - verify(secondaryTable, times(1)).delete(delete); - verify(secondaryTable, times(1)).batch(eq(Arrays.asList(delete)), any(Object[].class)); + verify(primaryTable, times(1)).batch(eq(originalDeletes), any(Object[].class)); + verify(secondaryTable, times(1)).batch(eq(originalDeletes), any(Object[].class)); } @Test - public void testMutateRow() throws IOException { + public void testMutateRow() throws IOException, InterruptedException { RowMutations mutations = new RowMutations("r1".getBytes()); + List listOfMutations = Arrays.asList(mutations); + mockBatch(primaryTable, secondaryTable, mutations, new Result()); mirroringTable.mutateRow(mutations); executorServiceRule.waitForExecutor(); - verify(secondaryTable, times(1)).mutateRow(mutations); + verify(primaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class)); + verify(secondaryTable, times(1)).batch(eq(listOfMutations), any(Object[].class)); } @Test @@ -981,6 +947,7 @@ public void testIncrement() throws IOException { executorServiceRule.waitForExecutor(); ArgumentCaptor argument = ArgumentCaptor.forClass(Put.class); + verify(primaryTable, times(3)).increment(any(Increment.class)); verify(secondaryTable, never()).increment(any(Increment.class)); verify(secondaryTable, times(3)).put(argument.capture()); @@ -1008,6 +975,7 @@ public void testAppend() throws IOException { CellUtil.createCell(row, family, qualifier, ts, Type.Put.getCode(), value) })); mirroringTable.append(append); + verify(primaryTable, times(1)).append(append); executorServiceRule.waitForExecutor(); ArgumentCaptor argument = ArgumentCaptor.forClass(Put.class); @@ -1035,11 +1003,17 @@ private void assertPutsAreEqual(Put expectedPut, Put value) { @Test public void testBatchWithCallback() throws IOException, InterruptedException { List mutations = Arrays.asList(createGet("get1")); - Object[] results = new Object[] {createResult("test")}; + Object[] expectedResults = new Object[] {createResult("test")}; + Object[] results = new Object[] {null}; + + doAnswer(createMockBatchAnswer(mutations.get(0), expectedResults[0])) + .when(primaryTable) + .batchCallback(ArgumentMatchers.anyList(), any(Object[].class), any(Callback.class)); + Callback callback = mock(Callback.class); mirroringTable.batchCallback(mutations, results, callback); executorServiceRule.waitForExecutor(); - verify(primaryTable, times(1)).batchCallback(mutations, results, callback); + verify(primaryTable, times(1)).batchCallback(mutations, expectedResults, callback); verify(secondaryTable, times(1)).batch(eq(mutations), any(Object[].class)); } @@ -1092,18 +1066,22 @@ public void testFlowControllerExceptionInGetPreventsSecondaryOperation() throws } @Test - public void testFlowControllerExceptionInPutExecutesWriteErrorHandler() throws IOException { + public void testFlowControllerExceptionInPutExecutesWriteErrorHandler() + throws IOException, InterruptedException { setupFlowControllerToRejectRequests(flowController); Put request = createPut("test", "f1", "q1", "v1"); + mockBatch(primaryTable, request, new Result()); + mirroringTable.put(request); executorServiceRule.waitForExecutor(); - verify(primaryTable, times(1)).put(request); - verify(secondaryTable, never()).get(any(Get.class)); + verify(primaryTable, times(1)) + .batch(eq(Collections.singletonList(request)), any(Object[].class)); + verify(secondaryTable, never()).batch(ArgumentMatchers.anyList(), any(Object[].class)); verify(secondaryWriteErrorConsumer, times(1)) - .consume(eq(HBaseOperation.PUT), eq(ImmutableList.of(request)), any(Throwable.class)); + .consume(eq(HBaseOperation.BATCH), eq(ImmutableList.of(request)), any(Throwable.class)); } @Test @@ -1113,24 +1091,17 @@ public void testFlowControllerExceptionInBatchExecutesWriteErrorHandler() Put put1 = createPut("test0", "f1", "q1", "v1"); Put put2 = createPut("test1", "f1", "q2", "v1"); - List request = ImmutableList.of(put1, put2, createGet("test2")); - - doAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocationOnMock) throws Throwable { - Object[] args = invocationOnMock.getArguments(); - Object[] result = (Object[]) args[1]; - - // secondary - result[0] = Result.create(new Cell[0]); - result[1] = Result.create(new Cell[0]); - result[2] = Result.create(new Cell[0]); - return null; - } - }) - .when(primaryTable) - .batch(eq(request), any(Object[].class)); + Get get1 = createGet("test2"); + List request = ImmutableList.of(put1, put2, get1); + + mockBatch( + primaryTable, + put1, + Result.create(new Cell[0]), + put2, + Result.create(new Cell[0]), + get1, + Result.create(new Cell[0])); Object[] results = new Object[3]; mirroringTable.batch(request, results); @@ -1149,15 +1120,21 @@ public void testBatchWithAppendsAndIncrements() throws IOException, InterruptedE Append append = new Append("a".getBytes()); append.add("f".getBytes(), "q".getBytes(), "v".getBytes()); - List operations = - Arrays.asList(increment, append, createPut("p", "f", "q", "v"), createGet("g")); - Object[] results = - new Object[] { - createResult("i", "f", "q", 1, "1"), - createResult("a", "f", "q", 2, "2"), - new Result(), - createResult("g", "f", "q", 3, "3"), - }; + + Put put = createPut("p", "f", "q", "v"); + Get get = createGet("g"); + List operations = Arrays.asList(increment, append, put, get); + mockBatch( + primaryTable, + increment, + createResult("i", "f", "q", 1, "1"), + append, + createResult("a", "f", "q", 2, "2"), + put, + new Result(), + get, + createResult("g", "f", "q", 3, "3")); + Object[] results = new Object[operations.size()]; List expectedSecondaryOperations = Arrays.asList( @@ -1179,4 +1156,243 @@ public void testBatchWithAppendsAndIncrements() throws IOException, InterruptedE (Put) argumentCaptor.getValue().get(2), (Put) expectedSecondaryOperations.get(2)); assertThat(argumentCaptor.getValue().get(3)).isEqualTo(expectedSecondaryOperations.get(3)); } + + @Test + public void testConcurrentWritesAreFlowControlledBeforePrimaryAction() + throws IOException, InterruptedException { + this.mirroringTable = + spy( + new MirroringTable( + primaryTable, + secondaryTable, + this.executorServiceRule.executorService, + mismatchDetector, + flowController, + secondaryWriteErrorConsumer, + new ReadSampler(100), + true, + new MirroringTracer())); + + Put put1 = createPut("r1", "f1", "q1", "v1"); + + // Both batches should be called even if first one fails. + mockBatch(primaryTable, secondaryTable, put1, new IOException()); + + InOrder inOrder = Mockito.inOrder(primaryTable, flowController); + try { + this.mirroringTable.put(put1); + fail("should fail"); + } catch (IOException ignored) { + } + inOrder.verify(flowController).asyncRequestResource(any(RequestResourcesDescription.class)); + inOrder.verify(primaryTable).batch(eq(Arrays.asList(put1)), any(Object[].class)); + + verify(primaryTable).batch(eq(Arrays.asList(put1)), any(Object[].class)); + verify(secondaryTable).batch(eq(Arrays.asList(put1)), any(Object[].class)); + } + + @Test + public void testNonConcurrentOpsWontBePerformedConcurrently() + throws IOException, InterruptedException { + setupMirroringTableWithDirectExecutor(); + Get get = createGet("get1"); + Increment increment = new Increment("row".getBytes()); + Append append = new Append("row".getBytes()); + + Put put = createPut("test1", "f1", "q1", "v1"); + Delete delete = createDelete("test2"); + + mockBatch( + primaryTable, + secondaryTable, + get, + createResult(), + increment, + createResult("row", "v1"), + append, + createResult("row", "v2")); + + checkBatchCalledSequentially(Arrays.asList(get)); + checkBatchCalledSequentially(Arrays.asList(increment)); + checkBatchCalledSequentially(Arrays.asList(append)); + + checkBatchCalledConcurrently(Arrays.asList(put)); + checkBatchCalledConcurrently(Arrays.asList(delete)); + checkBatchCalledConcurrently(Arrays.asList(put, delete)); + + checkBatchCalledSequentially(Arrays.asList(put, delete, get)); + checkBatchCalledSequentially(Arrays.asList(put, delete, increment)); + checkBatchCalledSequentially(Arrays.asList(put, delete, append)); + } + + private void setupMirroringTableWithDirectExecutor() { + this.mirroringTable = + spy( + new MirroringTable( + primaryTable, + secondaryTable, + MoreExecutors.listeningDecorator(MoreExecutors.newDirectExecutorService()), + mismatchDetector, + flowController, + secondaryWriteErrorConsumer, + new ReadSampler(100), + true, + new MirroringTracer())); + } + + private void checkBatchCalledSequentially(List requests) + throws IOException, InterruptedException { + InOrder inOrder = Mockito.inOrder(primaryTable, flowController, secondaryTable); + this.mirroringTable.batch(requests, new Object[requests.size()]); + inOrder.verify(primaryTable).batch(eq(requests), any(Object[].class)); + inOrder.verify(flowController).asyncRequestResource(any(RequestResourcesDescription.class)); + inOrder.verify(secondaryTable).batch(ArgumentMatchers.anyList(), any(Object[].class)); + } + + private void checkBatchCalledConcurrently(List requests) + throws IOException, InterruptedException { + InOrder inOrder = Mockito.inOrder(primaryTable, flowController, secondaryTable); + this.mirroringTable.batch(requests, new Object[requests.size()]); + inOrder.verify(flowController).asyncRequestResource(any(RequestResourcesDescription.class)); + inOrder.verify(secondaryTable).batch(eq(requests), any(Object[].class)); + inOrder.verify(primaryTable).batch(eq(requests), any(Object[].class)); + } + + @Test + public void testConcurrentWritesWithErrors() throws IOException, InterruptedException { + setupMirroringTableWithDirectExecutor(); + + Put put1 = createPut("test1", "f1", "q1", "v1"); + Put put2 = createPut("test2", "f2", "q2", "v2"); + Put put3 = createPut("test3", "f3", "q3", "v3"); + Put put4 = createPut("test4", "f4", "q4", "v4"); + Delete delete1 = createDelete("delete1"); + Delete delete2 = createDelete("delete2"); + Delete delete3 = createDelete("delete3"); + Delete delete4 = createDelete("delete4"); + + List requests = + Arrays.asList(put1, put2, put3, put4, delete1, delete2, delete3, delete4); + + // | p1 | p2 | p3 | p4 | d1 | d2 | d3 | d4 + // primary | v | v | x | x | v | v | x | x + // secondary | v | x | v | x | v | x | v | y + // p. error | | | x | | | | x | + // s. error | | x | | | | x | | + + IOException put2exception = new IOException("put2"); + IOException put3exception = new IOException("put3"); + IOException put4exception = new IOException("put4"); + + IOException delete2exception = new IOException("delete2"); + IOException delete3exception = new IOException("delete3"); + IOException delete4exception = new IOException("delete4"); + + mockBatch( + primaryTable, + put1, + new Result(), + put2, + new Result(), + put3, + put3exception, + put4, + put4exception, + delete1, + new Result(), + delete2, + new Result(), + delete3, + delete3exception, + delete4, + delete4exception); + + mockBatch( + secondaryTable, + put1, + new Result(), + put2, + put2exception, + put3, + new Result(), + put4, + put4exception, + delete1, + new Result(), + delete2, + delete2exception, + delete3, + new Result(), + delete4, + delete4exception); + + Object[] results = new Object[8]; + try { + this.mirroringTable.batch(requests, results); + fail("should throw"); + } catch (IOException ignored) { + } + + verify(secondaryWriteErrorConsumer, times(1)) + .consume(HBaseOperation.BATCH, put2, put2exception); + verify(secondaryWriteErrorConsumer, times(1)) + .consume(HBaseOperation.BATCH, delete2, delete2exception); + } + + @Test + public void testConcurrentOpsAreRunConcurrently() throws IOException, InterruptedException { + setupMirroringTableWithDirectExecutor(); + + Put put = createPut("test1", "f1", "q1", "v1"); + mockBatch(primaryTable, secondaryTable); + + final SettableFuture bothRun = SettableFuture.create(); + final AtomicBoolean firstRun = new AtomicBoolean(false); + final AtomicBoolean oneWaited = new AtomicBoolean(false); + + Answer answer = + new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + if (firstRun.getAndSet(true)) { + bothRun.set(null); + } else { + oneWaited.set(true); + } + bothRun.get(3, TimeUnit.SECONDS); + return null; + } + }; + + doAnswer(answer).when(primaryTable).batch(ArgumentMatchers.anyList(), any(Object[].class)); + doAnswer(answer) + .when(secondaryTable) + .batch(ArgumentMatchers.anyList(), any(Object[].class)); + + mirroringTable.put(put); + + verify(primaryTable, times(1)).batch(ArgumentMatchers.anyList(), any(Object[].class)); + verify(secondaryTable, times(1)).batch(ArgumentMatchers.anyList(), any(Object[].class)); + assertThat(oneWaited.get()).isTrue(); + } + + @Test + public void testConcurrentOpsAreNotPerformedWhenFlowControllerRejectsRequest() + throws IOException, InterruptedException { + IOException flowControllerExpection = setupFlowControllerToRejectRequests(flowController); + setupMirroringTableWithDirectExecutor(); + + Put put = createPut("test1", "f1", "q1", "v1"); + try { + mirroringTable.put(put); + fail("should throw"); + } catch (IOException e) { + // FlowController exception is wrapped in IOException by mirroringTable and in + // ExecutionException by a future. + assertThat(e).hasCauseThat().hasCauseThat().isEqualTo(flowControllerExpection); + } + + verify(primaryTable, never()).batch(ArgumentMatchers.anyList(), any(Object[].class)); + verify(secondaryTable, never()).batch(ArgumentMatchers.anyList(), any(Object[].class)); + } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java index a0f51e2ad8..e2d527869c 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java @@ -88,6 +88,7 @@ public void setUp() throws IOException, InterruptedException { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + false, new MirroringTracer())); mockExistsAll(this.primaryTable); diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java index c657b43f0d..9dfe8b09d4 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java @@ -90,6 +90,7 @@ public void setUp() { flowController, secondaryWriteErrorConsumer, readSampler, + false, new MirroringTracer())); }