From 5317ac6cb78b98ea06928b8d0dbca51cb5e651d4 Mon Sep 17 00:00:00 2001 From: Mateusz Walkiewicz Date: Fri, 21 Jan 2022 16:26:37 +0100 Subject: [PATCH] feat: Client-side timestamping --- .../hbase/mirroring/utils/Helpers.java | 2 +- .../hbase1_x/MirroringConnection.java | 9 + .../mirroring/hbase1_x/MirroringOptions.java | 7 + .../mirroring/hbase1_x/MirroringTable.java | 6 + .../ConcurrentMirroringBufferedMutator.java | 3 + .../MirroringBufferedMutator.java | 13 +- .../SequentialMirroringBufferedMutator.java | 3 + .../mirroring/hbase1_x/utils/Batcher.java | 7 +- .../utils/MirroringConfigurationHelper.java | 28 +++ .../utils/timestamper/CopyingTimestamper.java | 136 +++++++++++++ .../utils/timestamper/InPlaceTimestamper.java | 77 +++++++ .../utils/timestamper/MonotonicTimer.java | 43 ++++ .../utils/timestamper/NoopTimestamper.java | 47 +++++ .../utils/timestamper/TimestampUtils.java | 48 +++++ .../utils/timestamper/Timestamper.java | 39 ++++ .../utils/timestamper/TimestamperFactory.java | 29 +++ .../utils/timestamper/TimestampingMode.java | 22 ++ .../hbase1_x/TestMirroringMetrics.java | 4 + .../hbase1_x/TestMirroringTable.java | 7 + .../TestMirroringTableInputModification.java | 4 + .../TestMirroringTableSynchronousMode.java | 4 + .../hbase1_x/TestVerificationSampling.java | 4 + ...estConcurrentMirroringBufferedMutator.java | 5 + .../TestMirroringBufferedMutator.java | 5 + ...estSequentialMirroringBufferedMutator.java | 4 + .../timestamper/TestCopyingTimestamper.java | 190 ++++++++++++++++++ .../timestamper/TestInPlaceTimestamper.java | 190 ++++++++++++++++++ .../hbase/mirroring/TestBlocking.java | 2 +- .../hbase/mirroring/TestErrorDetection.java | 2 +- .../mirroring/TestMirroringAsyncTable.java | 2 +- .../MirroringAsyncBufferedMutator.java | 7 +- .../hbase2_x/MirroringAsyncConnection.java | 10 +- .../hbase2_x/MirroringAsyncTable.java | 9 + .../hbase2_x/MirroringConnection.java | 1 + .../mirroring/hbase2_x/MirroringTable.java | 3 + .../TestMirroringAsyncBufferedMutator.java | 9 +- .../hbase2_x/TestMirroringAsyncTable.java | 4 + ...tMirroringAsyncTableInputModification.java | 4 + .../hbase2_x/TestVerificationSampling.java | 4 + quickstart.md | 13 +- 40 files changed, 996 insertions(+), 10 deletions(-) create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/CopyingTimestamper.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/InPlaceTimestamper.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampUtils.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampingMode.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestCopyingTimestamper.java create mode 100644 bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestInPlaceTimestamper.java diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java index c6097bfb42..512927e0cb 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java @@ -43,7 +43,7 @@ public static Put createPut( public static Put createPut(int id, byte[] family, byte[] qualifier) { byte[] rowAndValue = Longs.toByteArray(id); - return createPut(rowAndValue, family, qualifier, id, rowAndValue); + return createPut(rowAndValue, family, qualifier, rowAndValue); } public static Get createGet(byte[] row, byte[] family, byte[] qualifier) { diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java index 9ae7634dd8..0bdf56cc34 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java @@ -26,6 +26,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestamperFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -90,6 +92,8 @@ public class MirroringConnection implements Connection { protected final boolean performWritesConcurrently; protected final boolean waitForSecondaryWrites; + // Depending on configuration, ensures that mutations have an explicitly set timestamp. + protected final Timestamper timestamper; /** * The constructor called from {@link @@ -162,6 +166,9 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate); this.performWritesConcurrently = this.configuration.mirroringOptions.performWritesConcurrently; this.waitForSecondaryWrites = this.configuration.mirroringOptions.waitForSecondaryWrites; + this.timestamper = + TimestamperFactory.create( + this.configuration.mirroringOptions.enableDefaultClientSideTimestamps); } @Override @@ -198,6 +205,7 @@ public Table call() throws IOException { this.flowController, this.secondaryWriteErrorConsumer, this.readSampler, + this.timestamper, this.performWritesConcurrently, this.waitForSecondaryWrites, this.mirroringTracer, @@ -223,6 +231,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams bufferedMutatorP flowController, executorService, secondaryWriteErrorConsumer, + timestamper, mirroringTracer); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java index 74dd65fafc..243fdfef53 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java @@ -18,6 +18,7 @@ import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONNECTION_CONNECTION_TERMINATION_TIMEOUT; +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_DROP_ON_OVERFLOW_KEY; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_MAX_BUFFER_SIZE_KEY; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_PREFIX_PATH_KEY; @@ -42,6 +43,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.faillog.Serializer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowControlStrategy; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestCountingFlowControlStrategy; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestampingMode; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.base.Preconditions; @@ -106,6 +108,7 @@ public static class Faillog { public final Faillog faillog; public final int resultScannerBufferedMismatchedResults; + public final TimestampingMode enableDefaultClientSideTimestamps; public MirroringOptions(Configuration configuration) { this.mismatchDetectorFactoryClass = @@ -153,5 +156,9 @@ public MirroringOptions(Configuration configuration) { "Performing writes concurrently and not waiting for writes is forbidden. " + "It has no advantage over performing writes asynchronously and not waiting for them."); this.faillog = new Faillog(configuration); + + this.enableDefaultClientSideTimestamps = + configuration.getEnum( + MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, TimestampingMode.inplace); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java index 59c4e76b64..893ad21dd2 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java @@ -36,6 +36,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.HierarchicalReferenceCounter; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory; import com.google.common.annotations.VisibleForTesting; @@ -114,6 +115,7 @@ public boolean apply(@NullableDecl Object o) { private final AtomicBoolean closed = new AtomicBoolean(false); private final SettableFuture closedFuture = SettableFuture.create(); private final int resultScannerBufferedMismatchedResults; + private final Timestamper timestamper; /** * @param executorService ExecutorService is used to perform operations on secondaryTable and * verification tasks. @@ -129,6 +131,7 @@ public MirroringTable( FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, ReadSampler readSampler, + Timestamper timestamper, boolean performWritesConcurrently, boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, @@ -148,6 +151,7 @@ public MirroringTable( this.mirroringTracer = mirroringTracer; this.requestScheduler = new RequestScheduler(flowController, this.mirroringTracer, this.referenceCounter); + this.timestamper = timestamper; this.batcher = new Batcher( this.primaryTable, @@ -156,6 +160,7 @@ public MirroringTable( this.secondaryWriteErrorConsumer, this.verificationContinuationFactory, this.readSampler, + this.timestamper, resultIsFaultyPredicate, waitForSecondaryWrites, performWritesConcurrently, @@ -545,6 +550,7 @@ private boolean checkAndMutateWithSpan( final byte[] value, final RowMutations rowMutations) throws IOException { + this.timestamper.fillTimestamp(rowMutations); boolean wereMutationsApplied = this.mirroringTracer.spanFactory.wrapPrimaryOperation( new CallableThrowingIOException() { diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java index 3430fe5c3a..5e69d9b876 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java @@ -22,6 +22,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.util.concurrent.FutureCallback; @@ -78,6 +79,7 @@ public ConcurrentMirroringBufferedMutator( BufferedMutatorParams bufferedMutatorParams, MirroringConfiguration configuration, ExecutorService executorService, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { super( @@ -86,6 +88,7 @@ public ConcurrentMirroringBufferedMutator( bufferedMutatorParams, configuration, executorService, + timestamper, mirroringTracer); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java index c138355725..250c6d723b 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java @@ -24,6 +24,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -60,6 +61,9 @@ */ @InternalApi("For internal usage only") public abstract class MirroringBufferedMutator implements BufferedMutator { + + private final Timestamper timestamper; + public static BufferedMutator create( boolean concurrent, Connection primaryConnection, @@ -69,6 +73,7 @@ public static BufferedMutator create( FlowController flowController, ExecutorService executorService, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { if (concurrent) { @@ -78,6 +83,7 @@ public static BufferedMutator create( bufferedMutatorParams, configuration, executorService, + timestamper, mirroringTracer); } else { return new SequentialMirroringBufferedMutator( @@ -88,6 +94,7 @@ public static BufferedMutator create( flowController, executorService, secondaryWriteErrorConsumer, + timestamper, mirroringTracer); } } @@ -121,6 +128,7 @@ public MirroringBufferedMutator( BufferedMutatorParams bufferedMutatorParams, MirroringConfiguration configuration, ExecutorService executorService, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { this.userListener = bufferedMutatorParams.getListener(); @@ -167,12 +175,14 @@ public void onException( this.mutationsBufferFlushThresholdBytes, this.ongoingFlushesCounter, this.mirroringTracer); + this.timestamper = timestamper; } @Override public void mutate(Mutation mutation) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BUFFERED_MUTATOR_MUTATE)) { + mutation = timestamper.fillTimestamp(mutation); mutateScoped(Collections.singletonList(mutation)); } } @@ -182,7 +192,8 @@ public void mutate(final List list) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope( HBaseOperation.BUFFERED_MUTATOR_MUTATE_LIST)) { - mutateScoped(list); + List timestampedList = timestamper.fillTimestamp(list); + mutateScoped(timestampedList); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java index 5b63133634..bbe1146250 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.collect.MapMaker; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -137,6 +138,7 @@ public SequentialMirroringBufferedMutator( FlowController flowController, ExecutorService executorService, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { super( @@ -145,6 +147,7 @@ public SequentialMirroringBufferedMutator( bufferedMutatorParams, configuration, executorService, + timestamper, mirroringTracer); this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer; this.flowController = flowController; diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java index 4051509f20..37e1e9cc42 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java @@ -29,6 +29,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -70,6 +71,7 @@ public class Batcher { private final boolean waitForSecondaryWrites; private final boolean performWritesConcurrently; private final MirroringTracer mirroringTracer; + private final Timestamper timestamper; public Batcher( Table primaryTable, @@ -78,6 +80,7 @@ public Batcher( SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, VerificationContinuationFactory verificationContinuationFactory, ReadSampler readSampler, + Timestamper timestamper, Predicate resultIsFaultyPredicate, boolean waitForSecondaryWrites, boolean performWritesConcurrently, @@ -92,6 +95,7 @@ public Batcher( this.waitForSecondaryWrites = waitForSecondaryWrites; this.performWritesConcurrently = performWritesConcurrently; this.mirroringTracer = mirroringTracer; + this.timestamper = timestamper; } public void batchSingleWriteOperation(Row operation) throws IOException { @@ -129,8 +133,9 @@ public void batch( final Object[] results, @Nullable final Callback callback) throws IOException, InterruptedException { + List timestampedInputOperations = timestamper.fillTimestamp(inputOperations); final RewrittenIncrementAndAppendIndicesInfo actions = - new RewrittenIncrementAndAppendIndicesInfo<>(inputOperations); + new RewrittenIncrementAndAppendIndicesInfo<>(timestampedInputOperations); Log.trace( "[%s] batch(operations=%s, results)", this.primaryTable.getName(), actions.operations); diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java index a2748d38ce..4b371afc48 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java @@ -30,6 +30,8 @@ import java.util.Objects; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -280,6 +282,32 @@ public class MirroringConfigurationHelper { public static final String MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS = "google.bigtable.mirroring.result-scanner.buffered-mismatched-reads"; + /** + * Enables timestamping {@link org.apache.hadoop.hbase.client.Put}s without timestamp set based on + * client's host local time. Client-side timestamps assigned by {@link Table}s and {@link + * BufferedMutator}`s created by one {@link Connection} are always increasing, even if system + * clock is moved backwards, for example by NTP or manually by the user. + * + *

There are three possible modes of client-side timestamping: + * + *

    + *
  • disabled - leads to inconsistencies between mirrored databases because timestamps are + * assigned separately on databases' severs. + *
  • inplace - all mutations without timestamp are modified in place and have timestamps + * assigned when submitted to Table or BufferedMutator. If mutation objects are reused then + * COPY mode should be used. + *
  • copy - timestamps are added to `Put`s after copying them, increases CPU load, but + * mutations submitted to Tables and BufferedMutators can be reused. This mode, combined + * with synchronous writes, gives a guarantee that after HBase API call returns submitted + * mutation objects are no longer used and can be safely modified by the user and submitted + * again. + *
+ * + *

Default value: inplace. + */ + public static final String MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS = + "google.bigtable.mirroring.enable-default-client-side-timestamps"; + public static void fillConnectionConfigWithClassImplementation( Configuration connectionConfig, Configuration config, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/CopyingTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/CopyingTimestamper.java new file mode 100644 index 0000000000..b2a2f8dcd0 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/CopyingTimestamper.java @@ -0,0 +1,136 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import com.google.api.core.InternalApi; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableMap; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +@InternalApi("For internal use only") +public class CopyingTimestamper implements Timestamper { + private final MonotonicTimer timer = new MonotonicTimer(); + + @Override + public Put fillTimestamp(Put put) { + long timestamp = timer.getCurrentTimeMillis(); + return setPutTimestamp(put, timestamp); + } + + @Override + public RowMutations fillTimestamp(RowMutations rowMutations) { + long timestamp = timer.getCurrentTimeMillis(); + return setRowMutationsTimestamp(rowMutations, timestamp); + } + + @Override + public Mutation fillTimestamp(Mutation mutation) { + if (mutation instanceof Put) { + return fillTimestamp((Put) mutation); + } + return mutation; + } + + @Override + public List fillTimestamp(List list) { + long timestamp = timer.getCurrentTimeMillis(); + List result = new ArrayList<>(); + for (T row : list) { + result.add(setTimestamp(row, timestamp)); + } + return result; + } + + private T setTimestamp(T row, long timestamp) { + // Those casts are totally safe, but there are not subclasses of Put and RowMutations that we + // know of. + if (row instanceof Put) { + return (T) setPutTimestamp((Put) row, timestamp); + } else if (row instanceof RowMutations) { + return (T) setRowMutationsTimestamp((RowMutations) row, timestamp); + } + // Bigtable doesn't support timestamps for Increment and Append and only a specific subset of + // Deletes, let's not modify them. + return row; + } + + private Put setPutTimestamp(Put put, long timestamp) { + Put putCopy = clonePut(put); + TimestampUtils.setPutTimestamp(putCopy, timestamp); + return putCopy; + } + + private Put clonePut(Put toClone) { + // This copy shares Cells with the original. + Put putCopy = new Put(toClone); + cloneFamilyCallMap(putCopy.getFamilyCellMap()); + return putCopy; + } + + private void cloneFamilyCallMap(NavigableMap> familyCellMap) { + for (List cells : familyCellMap.values()) { + cloneCellList(cells); + } + } + + private void cloneCellList(List cells) { + for (int i = 0; i < cells.size(); i++) { + cells.set(i, cloneCell(cells.get(i))); + } + } + + private Cell cloneCell(Cell cell) { + if (!(cell instanceof KeyValue)) { + throw new RuntimeException( + "CopyingTimestamper doesn't support Puts with cells different than the default KeyValue cell."); + } + try { + return ((KeyValue) cell).clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException( + "KeyValue implementation doesn't support clone() method and CopyingTimestamper cannot use it."); + } + } + + private RowMutations setRowMutationsTimestamp(RowMutations rowMutations, long timestamp) { + try { + RowMutations result = new RowMutations(rowMutations.getRow()); + for (Mutation mutation : rowMutations.getMutations()) { + if (mutation instanceof Put) { + result.add(setPutTimestamp((Put) mutation, timestamp)); + } else if (mutation instanceof Delete) { + result.add((Delete) mutation); + } else { + // Only and `Delete`s and `Put`s are supported. + throw new RuntimeException(); + } + } + return result; + } catch (IOException e) { + // IOException is thrown when row of added mutation doesn't match `RowMutation`s row. + // This shouldn't happen. + throw new RuntimeException(e); + } + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/InPlaceTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/InPlaceTimestamper.java new file mode 100644 index 0000000000..6374ed4c10 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/InPlaceTimestamper.java @@ -0,0 +1,77 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import com.google.api.core.InternalApi; +import java.util.List; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +@InternalApi("For internal use only") +public class InPlaceTimestamper implements Timestamper { + private final MonotonicTimer timer = new MonotonicTimer(); + + @Override + public Put fillTimestamp(Put put) { + long timestamp = timer.getCurrentTimeMillis(); + TimestampUtils.setPutTimestamp(put, timestamp); + return put; + } + + @Override + public RowMutations fillTimestamp(RowMutations rowMutations) { + long timestamp = timer.getCurrentTimeMillis(); + setRowMutationsTimestamp(rowMutations, timestamp); + return rowMutations; + } + + @Override + public Mutation fillTimestamp(Mutation mutation) { + if (mutation instanceof Put) { + return fillTimestamp((Put) mutation); + } + return mutation; + } + + @Override + public List fillTimestamp(List list) { + long timestamp = timer.getCurrentTimeMillis(); + for (T r : list) { + setTimestamp(r, timestamp); + } + return list; + } + + private void setTimestamp(Row row, long timestamp) { + // Those casts are totally safe, but there are not subclasses of Put and RowMutations that we + // know of. + if (row instanceof Put) { + TimestampUtils.setPutTimestamp((Put) row, timestamp); + } else if (row instanceof RowMutations) { + setRowMutationsTimestamp((RowMutations) row, timestamp); + } + // Bigtable doesn't support timestamps for Increment and Append and only a specific subset of + // Deletes, let's not modify them. + } + + private void setRowMutationsTimestamp(RowMutations rowMutations, long timestamp) { + for (Mutation mutation : rowMutations.getMutations()) { + setTimestamp(mutation, timestamp); + } + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java new file mode 100644 index 0000000000..211a0fd89c --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; + +/** + * {@code System#currentTimeMillis()} is not monotonic and using it as a source for {@link + * org.apache.hadoop.hbase.client.Mutation} timestamps can result in confusion and unexpected + * reordering of written versions. + * + *

This class provides a monotonically increasing value that is related to wall time. + * + *

Guava's {@link Stopwatch} is monotonic because it uses {@link System#nanoTime()} to measure + * passed time. + */ +public class MonotonicTimer { + private final long startingTimestampMillis; + private final Stopwatch stopwatch; + + public MonotonicTimer() { + this.startingTimestampMillis = System.currentTimeMillis(); + this.stopwatch = Stopwatch.createStarted(); + } + + public long getCurrentTimeMillis() { + return this.startingTimestampMillis + this.stopwatch.elapsed(TimeUnit.MILLISECONDS); + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java new file mode 100644 index 0000000000..44839fbbee --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java @@ -0,0 +1,47 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import com.google.api.core.InternalApi; +import java.util.List; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +@InternalApi("For internal use only") +public class NoopTimestamper implements Timestamper { + + @Override + public List fillTimestamp(List list) { + return list; + } + + @Override + public RowMutations fillTimestamp(RowMutations rowMutations) { + return rowMutations; + } + + @Override + public Put fillTimestamp(Put put) { + return put; + } + + @Override + public Mutation fillTimestamp(Mutation mutation) { + return mutation; + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampUtils.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampUtils.java new file mode 100644 index 0000000000..dde05ff30b --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampUtils.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Put; + +public class TimestampUtils { + public static void setPutTimestamp(Put put, long timestamp) { + for (Map.Entry> entry : put.getFamilyCellMap().entrySet()) { + for (Cell cell : entry.getValue()) { + try { + if (isTimestampNotSet(cell.getTimestamp())) { + CellUtil.setTimestamp(cell, timestamp); + } + } catch (IOException e) { + // IOException is thrown when `cell` does not implement `SettableTimestamp` and if it + // doesn't the we do not have any reliable way for setting the timestamp, thus we are just + // leaving it as-is. + // This shouldn't happen for vanilla `Put` instances. + throw new RuntimeException(e); + } + } + } + } + + private static boolean isTimestampNotSet(long timestamp) { + return timestamp == HConstants.LATEST_TIMESTAMP; + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java new file mode 100644 index 0000000000..923c5c5d81 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java @@ -0,0 +1,39 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import com.google.api.core.InternalApi; +import java.util.List; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +/** + * Timestamper implementations are responsible for adding (or not) timestamps to {@link Put}s before + * they are sent to underlying databases. + */ +@InternalApi("For internal use only") +public interface Timestamper { + + List fillTimestamp(List list); + + RowMutations fillTimestamp(RowMutations rowMutations); + + Put fillTimestamp(Put put); + + Mutation fillTimestamp(Mutation mutation); +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java new file mode 100644 index 0000000000..2d32884a00 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +public class TimestamperFactory { + public static Timestamper create(TimestampingMode mode) { + switch (mode) { + case inplace: + return new InPlaceTimestamper(); + case copy: + return new CopyingTimestamper(); + default: + return new NoopTimestamper(); + } + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampingMode.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampingMode.java new file mode 100644 index 0000000000..6c5f40d48d --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestampingMode.java @@ -0,0 +1,22 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +public enum TimestampingMode { + disabled, + inplace, + copy, +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java index f5be6e44d2..69a2a9d37f 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java @@ -54,6 +54,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector; import io.opencensus.trace.Tracing; import java.io.IOException; @@ -92,6 +94,7 @@ public class TestMirroringMetrics { @Mock Table primaryTable; @Mock Table secondaryTable; @Mock FlowController flowController; + Timestamper timestamper = new NoopTimestamper(); @Mock MirroringMetricsRecorder mirroringMetricsRecorder; @@ -117,6 +120,7 @@ public void setUp() { new FailedMutationLogger( tracer, mock(Appender.class), mock(Serializer.class)))), new ReadSampler(100), + this.timestamper, false, false, tracer, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java index c755a0e7ca..c5941f669a 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java @@ -49,6 +49,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector.ScannerResultVerifier; @@ -114,6 +116,7 @@ public class TestMirroringTable { @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; @Mock ReferenceCounter referenceCounter; @Mock MirroringMetricsRecorder mirroringMetricsRecorder; + Timestamper timestamper = new NoopTimestamper(); MismatchDetector mismatchDetector; MirroringTable mirroringTable; @@ -137,6 +140,7 @@ public void setUp() { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, false, false, this.mirroringTracer, @@ -1267,6 +1271,7 @@ public void testConcurrentWritesAreFlowControlledBeforePrimaryAction() flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, performWritesConcurrently, waitForSecondaryWrites, this.mirroringTracer, @@ -1356,6 +1361,7 @@ private void setupConcurrentMirroringTableWithDirectExecutor() { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, performWritesConcurrently, waitForSecondaryWrites, this.mirroringTracer, @@ -1488,6 +1494,7 @@ public void testConcurrentOpsAreRunConcurrently() throws IOException, Interrupte flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, performWritesConcurrently, waitForSecondaryWrites, this.mirroringTracer, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java index 591ea45f16..8243dae352 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java @@ -34,6 +34,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -73,6 +75,7 @@ public class TestMirroringTableInputModification { @Mock MismatchDetector mismatchDetector; @Mock FlowController flowController; @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + Timestamper timestamper = new NoopTimestamper(); MirroringTable mirroringTable; SettableFuture secondaryOperationBlockingFuture; @@ -90,6 +93,7 @@ public void setUp() throws IOException, InterruptedException { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + timestamper, false, false, new MirroringTracer(), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java index 23d4ca18a2..7da31203fd 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java @@ -35,6 +35,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import java.io.IOException; import java.util.Arrays; @@ -68,6 +70,7 @@ public class TestMirroringTableSynchronousMode { @Mock MismatchDetector mismatchDetector; @Mock FlowController flowController; @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + Timestamper timestamper = new NoopTimestamper(); MirroringTable mirroringTable; @@ -88,6 +91,7 @@ private void setupTable(boolean concurrent) { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, concurrent, true, new MirroringTracer(), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java index 3d38952bb1..25c10af1e4 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java @@ -34,6 +34,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -61,6 +63,7 @@ @RunWith(JUnit4.class) public class TestVerificationSampling { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + Timestamper timestamper = new NoopTimestamper(); @Rule public final ExecutorServiceRule executorServiceRule = @@ -91,6 +94,7 @@ public void setUp() { flowController, secondaryWriteErrorConsumer, readSampler, + timestamper, false, false, new MirroringTracer(), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java index 2f074bdc6d..070c38b6aa 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java @@ -33,6 +33,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.util.Arrays; @@ -59,6 +61,8 @@ public class TestConcurrentMirroringBufferedMutator { @Rule public final ExecutorServiceRule executorServiceRule = ExecutorServiceRule.cachedPoolExecutor(); + Timestamper timestamper = new NoopTimestamper(); + public final MirroringBufferedMutatorCommon common = new MirroringBufferedMutatorCommon(); private final List singletonMutation1 = Collections.singletonList(common.mutation1); @@ -358,6 +362,7 @@ private BufferedMutator getBufferedMutator(long flushThreshold) throws IOExcepti common.bufferedMutatorParams, makeConfigurationWithFlushThreshold(flushThreshold), executorServiceRule.executorService, + timestamper, new MirroringTracer()); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java index b2cbbe24d5..66ff422960 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java @@ -22,6 +22,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.TestConnection; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.junit.Rule; @@ -34,6 +36,7 @@ @RunWith(JUnit4.class) public class TestMirroringBufferedMutator { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + Timestamper timestamper = new NoopTimestamper(); @Rule public final ExecutorServiceRule executorServiceRule = ExecutorServiceRule.cachedPoolExecutor(); @@ -60,6 +63,7 @@ public void testMirroringBufferedMutatorFactory() throws IOException { mutatorRule.flowController, executorServiceRule.executorService, mutatorRule.secondaryWriteErrorConsumerWithMetrics, + timestamper, new MirroringTracer())) .isInstanceOf(SequentialMirroringBufferedMutator.class); @@ -73,6 +77,7 @@ public void testMirroringBufferedMutatorFactory() throws IOException { mutatorRule.flowController, executorServiceRule.executorService, mutatorRule.secondaryWriteErrorConsumerWithMetrics, + timestamper, new MirroringTracer())) .isInstanceOf(ConcurrentMirroringBufferedMutator.class); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java index 1a45624eea..2293e3e04e 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java @@ -29,6 +29,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.ExecutorServiceRule; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -60,6 +62,7 @@ @RunWith(JUnit4.class) public class TestSequentialMirroringBufferedMutator { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + Timestamper timestamper = new NoopTimestamper(); @Rule public final ExecutorServiceRule executorServiceRule = @@ -371,6 +374,7 @@ private BufferedMutator getBufferedMutator(long flushThreshold) throws IOExcepti common.flowController, executorServiceRule.executorService, common.secondaryWriteErrorConsumerWithMetrics, + timestamper, new MirroringTracer()); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestCopyingTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestCopyingTimestamper.java new file mode 100644 index 0000000000..08af7c6438 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestCopyingTimestamper.java @@ -0,0 +1,190 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TestInPlaceTimestamper.getCell; +import static com.google.common.truth.Truth.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestCopyingTimestamper { + @Test + public void testFillingPutTimestamps() throws IOException { + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + inputPut.addColumn("f1".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + inputPut.addImmutable("f2".getBytes(), "q1".getBytes(), "v".getBytes()); + inputPut.addImmutable("f2".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + + long timestampBefore = System.currentTimeMillis(); + Put resultPut = new CopyingTimestamper().fillTimestamp(inputPut); + long timestampAfter = System.currentTimeMillis(); + + // Input is not modified + assertThat(getCell(inputPut, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputPut, 1).getTimestamp()).isEqualTo(123L); + assertThat(getCell(inputPut, 2).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputPut, 3).getTimestamp()).isEqualTo(123L); + + // Result has assigned timestamps + assertThat(getCell(resultPut, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(resultPut, 0).getTimestamp()).isAtMost(timestampAfter); + + assertThat(getCell(resultPut, 1).getTimestamp()).isEqualTo(123L); + + assertThat(getCell(resultPut, 2).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(resultPut, 2).getTimestamp()).isAtMost(timestampAfter); + + assertThat(resultPut.get("f2".getBytes(), "q2".getBytes()).get(0).getTimestamp()) + .isEqualTo(123L); + } + + @Test + public void testFillingRowMutationsTimestamps() throws IOException { + RowMutations inputRowMutations = new RowMutations("row".getBytes()); + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + inputPut.addColumn("f1".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + inputRowMutations.add(inputPut); + + Delete inputDelete = new Delete("row".getBytes(StandardCharsets.UTF_8)); + inputDelete.addColumn("f1".getBytes(), "q1".getBytes()); + inputDelete.addColumn("f1".getBytes(), "q2".getBytes(), 123L); + inputRowMutations.add(inputDelete); + + long timestampBefore = System.currentTimeMillis(); + RowMutations rm = new CopyingTimestamper().fillTimestamp(inputRowMutations); + long timestampAfter = System.currentTimeMillis(); + + // Input is not modified. + assertThat(getCell(inputRowMutations, 0, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputRowMutations, 0, 1).getTimestamp()).isEqualTo(123L); + + assertThat(getCell(inputRowMutations, 1, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputRowMutations, 1, 1).getTimestamp()).isEqualTo(123L); + + // Result has assigned timestamps. + assertThat(getCell(rm, 0, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(rm, 0, 0).getTimestamp()).isAtMost(timestampAfter); + assertThat(getCell(rm, 0, 1).getTimestamp()).isEqualTo(123L); + + assertThat(getCell(rm, 1, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(rm, 1, 1).getTimestamp()).isEqualTo(123L); + } + + @Test + public void testFillingListOfMutations() throws IOException { + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + Delete inputDelete = new Delete("row".getBytes(StandardCharsets.UTF_8)); + inputDelete.addColumn("f1".getBytes(), "q1".getBytes()); + + Increment inputIncrement = new Increment("row".getBytes(StandardCharsets.UTF_8)); + inputIncrement.addColumn("f1".getBytes(), "q1".getBytes(), 1); + + Append inputAppend = new Append("row".getBytes(StandardCharsets.UTF_8)); + inputAppend.add("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + RowMutations inputRowMutations = new RowMutations("row".getBytes()); + Put inputRowMutationsPut = new Put("row".getBytes()); + inputRowMutationsPut.addColumn("f1".getBytes(), "q2".getBytes(), "v".getBytes()); + inputRowMutations.add(inputRowMutationsPut); + + Delete inputRowMutationsDelete = new Delete("row".getBytes()); + inputRowMutationsDelete.addColumn("f1".getBytes(), "q2".getBytes()); + inputRowMutations.add(inputRowMutationsDelete); + + long timestampBefore = System.currentTimeMillis(); + List result = + new CopyingTimestamper() + .fillTimestamp( + Arrays.asList( + inputPut, inputDelete, inputIncrement, inputAppend, inputRowMutations)); + long timestampAfter = System.currentTimeMillis(); + + // Input is not modified + assertThat(getCell(inputPut, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputDelete, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputIncrement, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputAppend, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputRowMutations, 0, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputRowMutations, 1, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + + // Result has assigned timestamps + Put resultPut = (Put) result.get(0); + Delete resultDelete = (Delete) result.get(1); + Increment resultIncrement = (Increment) result.get(2); + Append resultAppend = (Append) result.get(3); + RowMutations resultRowMutations = (RowMutations) result.get(4); + + assertThat(getCell(resultPut, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(resultPut, 0).getTimestamp()).isAtMost(timestampAfter); + assertThat(getCell(resultDelete, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(resultIncrement, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(resultAppend, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(resultRowMutations, 0, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(resultRowMutations, 0, 0).getTimestamp()).isAtMost(timestampAfter); + assertThat(getCell(resultRowMutations, 1, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + } + + @Test + public void testFillingListOfRowMutations() throws IOException { + Put p = new Put("row".getBytes(StandardCharsets.UTF_8)); + p.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + Delete d = new Delete("row".getBytes(StandardCharsets.UTF_8)); + d.addColumn("f1".getBytes(), "q1".getBytes()); + + RowMutations rm = new RowMutations("row".getBytes()); + rm.add(p); + rm.add(d); + + long timestampBefore = System.currentTimeMillis(); + List result = new CopyingTimestamper().fillTimestamp(Arrays.asList(rm)); + long timestampAfter = System.currentTimeMillis(); + + RowMutations resultRowMutations = result.get(0); + + assertThat(getCell(rm, 0, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(rm, 1, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + + assertThat(getCell(resultRowMutations, 0, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(resultRowMutations, 0, 0).getTimestamp()).isAtMost(timestampAfter); + + assertThat(getCell(resultRowMutations, 1, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestInPlaceTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestInPlaceTimestamper.java new file mode 100644 index 0000000000..85ce9f2bd4 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TestInPlaceTimestamper.java @@ -0,0 +1,190 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import static com.google.common.truth.Truth.assertThat; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestInPlaceTimestamper { + @Test + public void testFillingPutTimestamps() throws IOException { + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + inputPut.addColumn("f1".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + inputPut.addImmutable("f2".getBytes(), "q1".getBytes(), "v".getBytes()); + inputPut.addImmutable("f2".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + + long timestampBefore = System.currentTimeMillis(); + Put outputPut = new InPlaceTimestamper().fillTimestamp(inputPut); + long timestampAfter = System.currentTimeMillis(); + + assertRowEquals(outputPut, inputPut); + + assertThat(getCell(inputPut, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(inputPut, 0).getTimestamp()).isAtMost(timestampAfter); + + assertThat(getCell(inputPut, 1).getTimestamp()).isEqualTo(123L); + + assertThat(getCell(inputPut, 2).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(inputPut, 2).getTimestamp()).isAtMost(timestampAfter); + + assertThat(getCell(inputPut, 3).getTimestamp()).isEqualTo(123L); + } + + @Test + public void testFillingRowMutationsTimestamps() throws IOException { + RowMutations inputRowMutations = new RowMutations("row".getBytes()); + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + inputPut.addColumn("f1".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + inputRowMutations.add(inputPut); + + Delete inputDelete = new Delete("row".getBytes(StandardCharsets.UTF_8)); + inputDelete.addColumn("f1".getBytes(), "q1".getBytes()); + inputDelete.addColumn("f1".getBytes(), "q2".getBytes(), 123L); + inputRowMutations.add(inputDelete); + + long timestampBefore = System.currentTimeMillis(); + RowMutations result = new InPlaceTimestamper().fillTimestamp(inputRowMutations); + long timestampAfter = System.currentTimeMillis(); + + assertRowEquals(inputRowMutations, result); + + assertThat(getCell(inputRowMutations, 0, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(inputRowMutations, 0, 0).getTimestamp()).isAtMost(timestampAfter); + assertThat(getCell(inputRowMutations, 0, 1).getTimestamp()).isEqualTo(123L); + + assertThat(getCell(inputRowMutations, 1, 0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputRowMutations, 1, 1).getTimestamp()).isEqualTo(123L); + } + + @Test + public void testFillingListOfMutations() throws IOException { + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + Delete inputDelete = new Delete("row".getBytes(StandardCharsets.UTF_8)); + inputDelete.addColumn("f1".getBytes(), "q1".getBytes()); + + Increment inputIncrement = new Increment("row".getBytes(StandardCharsets.UTF_8)); + inputIncrement.addColumn("f1".getBytes(), "q1".getBytes(), 1); + + Append inputAppend = new Append("row".getBytes(StandardCharsets.UTF_8)); + inputAppend.add("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + RowMutations inputRowMutations = new RowMutations("row".getBytes()); + + List list = + Arrays.asList(inputPut, inputDelete, inputIncrement, inputAppend, inputRowMutations); + + long timestampBefore = System.currentTimeMillis(); + List result = new InPlaceTimestamper().fillTimestamp(list); + long timestampAfter = System.currentTimeMillis(); + + assertRecursiveEquals(list, result); + + assertThat(getCell(inputPut, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(inputPut, 0).getTimestamp()).isAtMost(timestampAfter); + + assertThat(getCell(inputDelete, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputIncrement, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(getCell(inputAppend, 0).getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + } + + @Test + public void testFillingListOfRowMutations() throws IOException { + Put inputPut = new Put("row".getBytes(StandardCharsets.UTF_8)); + inputPut.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + Delete inputDelete = new Delete("row".getBytes(StandardCharsets.UTF_8)); + inputDelete.addColumn("f1".getBytes(), "q1".getBytes()); + + RowMutations inputRowMutations = new RowMutations("row".getBytes()); + inputRowMutations.add(inputPut); + inputRowMutations.add(inputDelete); + + List input = Arrays.asList(inputRowMutations); + + long timestampBefore = System.currentTimeMillis(); + List result = new InPlaceTimestamper().fillTimestamp(input); + long timestampAfter = System.currentTimeMillis(); + + assertThat(result).isEqualTo(input); + assertRowMutationsEqual(result.get(0), input.get(0)); + + assertThat(getCell(inputPut, 0).getTimestamp()).isAtLeast(timestampBefore); + assertThat(getCell(inputPut, 0).getTimestamp()).isAtMost(timestampAfter); + } + + private void assertRecursiveEquals(List list, List result) { + assertThat(list).isEqualTo(result); + for (int i = 0; i < list.size(); i++) { + assertRowEquals(list.get(0), result.get(0)); + } + } + + private void assertRowEquals(Row row, Row row1) { + assertThat(row).isEqualTo(row1); + if (row instanceof RowMutations) { + assertRowMutationsEqual((RowMutations) row, (RowMutations) row1); + } + } + + private void assertRowMutationsEqual(RowMutations rm, RowMutations result) { + assertThat(rm).isEqualTo(result); + assertThat(rm.getMutations()).isEqualTo(result.getMutations()); + for (int i = 0; i < rm.getMutations().size(); i++) { + assertThat(rm.getMutations().get(i)).isEqualTo(result.getMutations().get(i)); + } + } + + public static Cell getCell(Mutation m, int id) throws IOException { + CellScanner cs = m.cellScanner(); + assertThat(cs.advance()).isTrue(); + for (int i = 0; i < id; i++) { + assertThat(cs.advance()).isTrue(); + } + return cs.current(); + } + + public static Cell getCell(RowMutations m, int mutationId, int id) throws IOException { + CellScanner cs = m.getMutations().get(mutationId).cellScanner(); + assertThat(cs.advance()).isTrue(); + for (int i = 0; i < id; i++) { + assertThat(cs.advance()).isTrue(); + } + return cs.current(); + } +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java index f1df0bcefd..8ce2d3b434 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java @@ -26,8 +26,8 @@ import com.google.cloud.bigtable.hbase.mirroring.utils.ConfigurationHelper; import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; -import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java index d60b449593..8d4f95d507 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java @@ -26,9 +26,9 @@ import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; -import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; import com.google.cloud.bigtable.hbase.mirroring.utils.PropagatingThread; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.mirroring.hbase1_x.ExecutorServiceRule; import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection; import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java index 5e25e2f635..c430e855c8 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java @@ -22,8 +22,8 @@ import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; -import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.TestWriteErrorConsumer; import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegion; import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegionRule; diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java index 98c890c571..0e8b83ea58 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java @@ -21,6 +21,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase2_x.utils.futures.FutureConverter; import java.util.ArrayList; import java.util.List; @@ -42,17 +43,20 @@ public class MirroringAsyncBufferedMutator implements AsyncBufferedMutator { private final ListenableReferenceCounter referenceCounter; private final SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Timestamper timestamper; public MirroringAsyncBufferedMutator( AsyncBufferedMutator primary, AsyncBufferedMutator secondary, FlowController flowController, - SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer) { + SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer, + Timestamper timestamper) { this.primary = primary; this.secondary = secondary; this.flowController = flowController; this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer; this.referenceCounter = new ListenableReferenceCounter(); + this.timestamper = timestamper; } @Override @@ -67,6 +71,7 @@ public Configuration getConfiguration() { @Override public CompletableFuture mutate(Mutation mutation) { + this.timestamper.fillTimestamp(mutation); referenceCounter.incrementReferenceCount(); CompletableFuture primaryCompleted = primary.mutate(mutation); CompletableFuture resultFuture = new CompletableFuture<>(); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java index 894054fc2d..bc4f316041 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java @@ -24,6 +24,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestamperFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; @@ -67,6 +69,7 @@ public class MirroringAsyncConnection implements AsyncConnection { private final AtomicBoolean closed = new AtomicBoolean(false); private final ReadSampler readSampler; private final ExecutorService executorService; + private final Timestamper timestamper; /** * The constructor called from {@link @@ -142,6 +145,9 @@ public MirroringAsyncConnection( this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate); this.executorService = Executors.newCachedThreadPool(); + this.timestamper = + TimestamperFactory.create( + this.configuration.mirroringOptions.enableDefaultClientSideTimestamps); } public AsyncConnection getPrimaryConnection() { @@ -291,6 +297,7 @@ public AsyncTable build() { secondaryWriteErrorConsumer, mirroringTracer, readSampler, + timestamper, referenceCounter, executorService, configuration.mirroringOptions.resultScannerBufferedMismatchedResults); @@ -404,7 +411,8 @@ public AsyncBufferedMutator build() { this.primaryMutatorBuilder.build(), this.secondaryMutatorBuilder.build(), flowController, - secondaryWriteErrorConsumer); + secondaryWriteErrorConsumer, + timestamper); } @Override diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java index 07d79489f2..683160a21b 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java @@ -35,6 +35,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory; import com.google.cloud.bigtable.mirroring.hbase2_x.utils.AsyncRequestScheduling.OperationStages; @@ -95,6 +96,7 @@ public class MirroringAsyncTable implements As private final ExecutorService executorService; private final RequestScheduler requestScheduler; private final int resultScannerBufferedMismatchedResults; + private final Timestamper timestamper; public MirroringAsyncTable( AsyncTable primaryTable, @@ -104,6 +106,7 @@ public MirroringAsyncTable( SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer, MirroringTracer mirroringTracer, ReadSampler readSampler, + Timestamper timestamper, ListenableReferenceCounter referenceCounter, ExecutorService executorService, int resultScannerBufferedMismatchedResults) { @@ -119,6 +122,7 @@ public MirroringAsyncTable( this.requestScheduler = new RequestScheduler(this.flowController, this.mirroringTracer, this.referenceCounter); this.resultScannerBufferedMismatchedResults = resultScannerBufferedMismatchedResults; + this.timestamper = timestamper; } @Override @@ -150,6 +154,7 @@ public CompletableFuture exists(Get get) { @Override public CompletableFuture put(Put put) { + this.timestamper.fillTimestamp(put); CompletableFuture primaryFuture = this.primaryTable.put(put); return writeWithFlowControl( new WriteOperationInfo(put), primaryFuture, () -> this.secondaryTable.put(put)) @@ -186,6 +191,7 @@ public CompletableFuture increment(Increment increment) { @Override public CompletableFuture mutateRow(RowMutations rowMutations) { + this.timestamper.fillTimestamp(rowMutations); CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations); return writeWithFlowControl( new WriteOperationInfo(rowMutations), @@ -257,6 +263,7 @@ OperationStages>> generalBatch( Function, GeneralBatchBuilder> batchBuilderCreator, Class successfulResultTypeClass) { + userActions = this.timestamper.fillTimestamp(userActions); OperationUtils.RewrittenIncrementAndAppendIndicesInfo actions = new OperationUtils.RewrittenIncrementAndAppendIndicesInfo<>(userActions); final int numActions = actions.operations.size(); @@ -565,6 +572,7 @@ private OperationStages> checkAndMutate( @Override public CompletableFuture thenPut(Put put) { + timestamper.fillTimestamp(put); return checkAndMutate( new WriteOperationInfo(put), this.primaryBuilder.thenPut(put), @@ -583,6 +591,7 @@ public CompletableFuture thenDelete(Delete delete) { @Override public CompletableFuture thenMutate(RowMutations rowMutations) { + timestamper.fillTimestamp(rowMutations); return checkAndMutate( new WriteOperationInfo(rowMutations), this.primaryBuilder.thenMutate(rowMutations), diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java index 5df9c91acd..481fe54466 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java @@ -84,6 +84,7 @@ public Table build() { flowController, secondaryWriteErrorConsumer, readSampler, + timestamper, performWritesConcurrently, waitForSecondaryWrites, mirroringTracer, diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java index 73c52807a6..0d3dd5f344 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java @@ -20,6 +20,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import java.io.IOException; import java.util.List; @@ -41,6 +42,7 @@ public MirroringTable( FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, ReadSampler readSampler, + Timestamper timestamper, boolean performWritesConcurrently, boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, @@ -54,6 +56,7 @@ public MirroringTable( flowController, secondaryWriteErrorConsumer, readSampler, + timestamper, performWritesConcurrently, waitForSecondaryWrites, mirroringTracer, diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java index 3c7cb54a37..6628ca004d 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncBufferedMutator.java @@ -26,6 +26,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.SecondaryWriteErrorConsumerWithMetrics; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase2_x.utils.futures.FutureConverter; import java.io.IOException; import java.util.concurrent.CompletableFuture; @@ -50,6 +52,7 @@ public class TestMirroringAsyncBufferedMutator { @Mock AsyncBufferedMutator secondaryMutator; @Mock FlowController flowController; @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + Timestamper timestamper = new NoopTimestamper(); CompletableFuture primaryFuture; CompletableFuture secondaryCalled; @@ -63,7 +66,11 @@ public void setUp() { this.mirroringMutator = spy( new MirroringAsyncBufferedMutator( - primaryMutator, secondaryMutator, flowController, secondaryWriteErrorConsumer)); + primaryMutator, + secondaryMutator, + flowController, + secondaryWriteErrorConsumer, + timestamper)); this.put = new Put(Bytes.toBytes("rowKey")); put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("c1"), Bytes.toBytes("value")); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java index 32ff8d4089..b31162768d 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java @@ -44,6 +44,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Longs; @@ -102,6 +104,7 @@ public class TestMirroringAsyncTable { @Mock ListenableReferenceCounter referenceCounter; @Mock AsyncTable.CheckAndMutateBuilder primaryBuilder; @Mock ExecutorService executorService; + Timestamper timestamper = new NoopTimestamper(); MirroringAsyncTable mirroringTable; @@ -118,6 +121,7 @@ public void setUp() { secondaryWriteErrorConsumer, new MirroringTracer(), new ReadSampler(100), + timestamper, referenceCounter, executorService, 10)); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java index 5d01abf737..168e52a98a 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java @@ -32,6 +32,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; @@ -72,6 +74,7 @@ public class TestMirroringAsyncTableInputModification { @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; @Mock ListenableReferenceCounter referenceCounter; @Mock ExecutorService executorService; + Timestamper timestamper = new NoopTimestamper(); MirroringAsyncTable mirroringTable; CompletableFuture letPrimaryThroughFuture; @@ -91,6 +94,7 @@ public void setUp() { secondaryWriteErrorConsumer, new MirroringTracer(), new ReadSampler(100), + timestamper, referenceCounter, executorService, 10)); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java index b722a61029..2604a22bf8 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java @@ -32,6 +32,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.NoopTimestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -73,6 +75,7 @@ public class TestVerificationSampling { @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; @Mock ReadSampler readSampler; @Mock ListenableReferenceCounter referenceCounter; + Timestamper timestamper = new NoopTimestamper(); MirroringAsyncTable mirroringTable; @@ -92,6 +95,7 @@ public void setUp() { secondaryWriteErrorConsumer, new MirroringTracer(), readSampler, + timestamper, referenceCounter, executorServiceRule.executorService, 10)); diff --git a/quickstart.md b/quickstart.md index a38428a6bb..209d9e8728 100644 --- a/quickstart.md +++ b/quickstart.md @@ -131,6 +131,15 @@ In the concurrent mode writes are passed to both mutators at once. As in sequent Set `google.bigtable.mirroring.concurrent-writes` to `true` to enable concurrent Buffered Mutator mode (defaults to false). +## Client-side timestamping +HBase and Bigtable assign row version (timestamp) based on server-side time for mutations (unless version was explicitly assigned by the client). The Mirroring Client issues writes to underlying databases a few milliseconds apart and performing mutations without version assigned on the client side will cause inconsistencies between databases. To mitigate some of those issues a client-side timestamping is available in the Mirroring Client. When client-side timestamping is enabled the Mirroring Client will automatically add a timestamp based on client's machine's time to every `Put` object passed to the Mirroring Client. Client-side timestamps assigned by `Table`s and `BufferedMutator`s created by one `Connection` are always increasing, even if system clock is moved backwards, for example by NTP or manually by the user. +Be aware that client-side timestamping modifies only `Put`s - `Delete`s, `Increment`s and `Append`s are not affected by this setting and will cause inconsistencies between databases. +Client-side timestamping, if enabled, can use two modes - `inplace` and `copy`. `inplace` mode modifies provided `Put`s in-place, which is efficient but is not correct is the user reuses `Put` objects between calls. When `Put`s are reused the `copy` mode should be used - it will create a copy of each `Put` before assigning a timestamp and provided object can be safely reused in subsequent calls (please note that mutations passed to Mirroring Client are also used asynchronously and this safety guarantee is only provided if synchronous mode is enabled). +Client-side timestamping is enabled by default in `inplace` mode. +Use `google.bigtable.mirroring.enable-default-client-side-timestamps` property to disable it or change the mode. + +Place read a warning in `Caveats - Timestamps` section to decide which mode fits you use case best. + ## Configuration options - `google.bigtable.mirroring.primary-client.connection.impl` - a name of Connection class that should be used to connect to primary database. It is used as hbase.client.connection.impl when creating connection to primary database. Set to `default` to use default HBase connection class. Required. @@ -151,11 +160,13 @@ Set `google.bigtable.mirroring.concurrent-writes` to `true` to enable concurrent - `google.bigtable.mirroring.write-error-log.appender.drop-on-overflow` - used by DefaultAppender, whether to drop data if the thread flushing the data to disk is not keeping up or to block until it catches up. default: false. - `google.bigtable.mirroring.read-verification-rate-percent` - Integer value representing percentage of read operations performed on primary database that should be verified against secondary. Each call to `Table#get(Get)`, `Table#get(List)`, `Table#exists(Get)`, `Table#existsAll(List)`, `Table#batch(List, Object[])` (with overloads) and `Table#getScanner(Scan)` (with overloads) is counted as a single operation, independent of size of their arguments and results. Correct values are a integers ranging from 0 to 100 inclusive. default: 100. - `google.bigtable.mirroring.buffered-mutator.bytes-to-flush` - Number of bytes that `MirroringBufferedMutator` should buffer before flushing underlying primary BufferedMutator and executing a write to the secondary database. If not set the value of `hbase.client.write.buffer` is used, which by default is 2MB. When those values are kept in sync, the mirroring client should perform a flush operation on the primary BufferedMutator right after it schedules a new asynchronous write to the database. +- `google.bigtable.mirroring.enable-default-client-side-timestamps` - Select client-side timestamping mode. `disabled`, `inplace` and `copy` are only valid values. default: `inplace`. ## Caveats ### Timestamps -For ensuring full consistency between databases the user should always specify a timestamp for mutations issued using MirroringClient. Mutations without a timestamp will have one assigned by underlying database clients when the mutations are issued to underlying databases, what doesn't happen at the exact same instant for both databases. +Be aware that client-side timestamping modifies only `Put`s - `Delete`s, `Increment`s and `Append`s are not affected by this setting and will cause inconsistencies between databases. +Using client-side timestamping fixes inconsistencies caused by `Put`s, but can lead to **lost writes** if multiple machines are modifying the same cell and have clocks out of sync - writes from machine with correctly set time will be masked by puts from a machine with time in the future (clock out of sync), independent of the real order of those `Put`s. This problem wouldn't appear if client-side timestamping was disabled - timestamps would be assigned by the server and would reflect real ordering. ### Differences between Bigtable and HBase There are differences between HBase and Bigtable, please consult [this link](https://cloud.google.com/bigtable/docs/hbase-differences). Code using this client should be aware of them.