diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringTable.java index b610ba4896..71bf760d27 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringTable.java @@ -91,7 +91,7 @@ public void testPut() throws IOException { int databaseEntriesCount = 1000; final TableName tableName = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName)) { for (int i = 0; i < databaseEntriesCount; i++) { t1.put(Helpers.createPut(i, columnFamily1, qualifier1)); @@ -104,7 +104,7 @@ public void testPut() throws IOException { @Test public void testPuts() throws IOException { final TableName tableName = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName)) { int id = 0; for (int i = 0; i < 10; i++) { @@ -130,7 +130,7 @@ public void testPutWithPrimaryErrors() throws IOException { failEvenRowKeysPredicate, OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); final TableName tableName1 = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -149,7 +149,7 @@ public void run() throws IOException { databaseHelpers.verifyTableConsistency(tableName1); final TableName tableName2 = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName2)) { int id = 0; for (int i = 0; i < 100; i++) { @@ -182,7 +182,7 @@ public void testPutWithSecondaryErrors() throws IOException { final TableName tableName1 = connectionRule.createTable(columnFamily1); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { t1.put(Helpers.createPut(i, columnFamily1, qualifier1)); @@ -195,7 +195,7 @@ public void testPutWithSecondaryErrors() throws IOException { ReportedErrorsContext reportedErrorsContext2 = new ReportedErrorsContext(); final TableName tableName2 = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName2)) { int id = 0; for (int i = 0; i < databaseEntriesCount / 100; i++) { @@ -217,7 +217,7 @@ public void testDelete() throws IOException { int databaseEntriesCount = 1000; final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -229,7 +229,7 @@ public void testDelete() throws IOException { final TableName tableName2 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName2, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName2)) { int id = 0; for (int i = 0; i < databaseEntriesCount / 100; i++) { @@ -264,7 +264,7 @@ public void testDeleteWithPrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation( failEvenRowKeysPredicate, OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -281,7 +281,7 @@ public void run() throws IOException { } databaseHelpers.verifyTableConsistency(tableName1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName2)) { int id = 0; for (int i = 0; i < databaseEntriesCount / 100; i++) { @@ -333,7 +333,7 @@ public void testDeleteWithSecondaryErrors() throws IOException { failEvenRowKeysPredicate, OperationStatusCode.BAD_FAMILY, "failed"); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -347,7 +347,7 @@ public void testDeleteWithSecondaryErrors() throws IOException { reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); ReportedErrorsContext reportedErrorsContext2 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName2)) { int id = 0; for (int i = 0; i < databaseEntriesCount / 100; i++) { @@ -376,7 +376,7 @@ public void testCheckAndPut() throws IOException { final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { // We modify each row using for comparison a cell in its column qualifier1. @@ -449,7 +449,7 @@ public void testCheckAndPutPrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -493,7 +493,7 @@ public void testCheckAndPutSecondaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -543,7 +543,7 @@ public void testCheckAndDelete() throws IOException { qualifier4, qualifier5); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { // We delete each row using for comparison a cell in its column qualifier1. @@ -612,7 +612,7 @@ public void testCheckAndDeletePrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -657,7 +657,7 @@ public void testCheckAndDeleteSecondaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -697,7 +697,7 @@ public void testCheckAndMutate() throws IOException { final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -756,7 +756,7 @@ public void testCheckAndMutatePrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -803,7 +803,7 @@ public void testCheckAndMutateSecondaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -842,7 +842,7 @@ public void testIncrement() throws IOException { final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -853,7 +853,7 @@ public void testIncrement() throws IOException { databaseHelpers.verifyTableConsistency(tableName1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -875,7 +875,7 @@ public void testIncrementPrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -910,7 +910,7 @@ public void testIncrementSecondaryErrors() throws IOException { failEvenRowKeysPredicate, OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -930,7 +930,7 @@ public void testAppend() throws IOException { final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -941,7 +941,7 @@ public void testAppend() throws IOException { databaseHelpers.verifyTableConsistency(tableName1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -965,7 +965,7 @@ public void testAppendPrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -1001,7 +1001,7 @@ public void testAppendSecondaryErrors() throws IOException { failEvenRowKeysPredicate, OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table table = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -1019,7 +1019,7 @@ public void testGet() throws IOException { int databaseEntriesCount = 1000; final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -1042,7 +1042,7 @@ public void testGetWithPrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -1070,7 +1070,7 @@ public void testGetWithSecondaryErrors() throws IOException { databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -1090,7 +1090,7 @@ public void testExists() throws IOException { int databaseEntriesCount = 1000; final TableName tableName1 = connectionRule.createTable(columnFamily1); databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -1112,7 +1112,7 @@ public void testExistsWithPrimaryErrors() throws IOException { FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { final byte[] rowKey = rowKeyFromId(i); @@ -1140,7 +1140,7 @@ public void testExistsWithSecondaryErrors() throws IOException { databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName1)) { for (int i = 0; i < databaseEntriesCount; i++) { byte[] rowKey = rowKeyFromId(i); @@ -1160,7 +1160,7 @@ public void testBatch() throws IOException, InterruptedException { int databaseEntriesCount = 1000; final TableName tableName = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName)) { int id = 0; while (id < databaseEntriesCount) { @@ -1189,7 +1189,7 @@ public void testBatchWithPrimaryErrors() throws IOException, InterruptedExceptio FailingHBaseHRegion.failMutation(failEvenRowKeysPredicate, "failed"); final TableName tableName = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName)) { int id = 0; while (id < databaseEntriesCount) { @@ -1232,7 +1232,7 @@ public void testBatchWithSecondaryErrors() throws IOException, InterruptedExcept ReportedErrorsContext reportedErrorsContext1 = new ReportedErrorsContext(); final TableName tableName2 = connectionRule.createTable(columnFamily1); - try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (MirroringConnection connection = databaseHelpers.createTimestampedConnection()) { try (Table t1 = connection.getTable(tableName2)) { int id = 0; while (id < databaseEntriesCount) { diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/DatabaseHelpers.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/DatabaseHelpers.java index 29cda9f598..830d6608ea 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/DatabaseHelpers.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/DatabaseHelpers.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.hbase.mirroring.utils; +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -184,6 +185,12 @@ public MirroringConnection createConnection() throws IOException { return connectionRule.createConnection(this.executorServiceRule.executorService); } + public MirroringConnection createTimestampedConnection() throws IOException { + Configuration conf = ConfigurationHelper.newConfiguration(); + conf.setBoolean(MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, true); + return connectionRule.createConnection(this.executorServiceRule.executorService, conf); + } + public MirroringConnection createConnection(Configuration configuration) throws IOException { return connectionRule.createConnection(this.executorServiceRule.executorService, configuration); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java index c6097bfb42..512927e0cb 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/Helpers.java @@ -43,7 +43,7 @@ public static Put createPut( public static Put createPut(int id, byte[] family, byte[] qualifier) { byte[] rowAndValue = Longs.toByteArray(id); - return createPut(rowAndValue, family, qualifier, id, rowAndValue); + return createPut(rowAndValue, family, qualifier, rowAndValue); } public static Get createGet(byte[] row, byte[] family, byte[] qualifier) { diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java index 9ae7634dd8..6f8f902e3d 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringConnection.java @@ -26,6 +26,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestamperFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -90,6 +92,7 @@ public class MirroringConnection implements Connection { protected final boolean performWritesConcurrently; protected final boolean waitForSecondaryWrites; + protected final Timestamper timestamper; /** * The constructor called from {@link @@ -162,6 +165,9 @@ public MirroringConnection(Configuration conf, boolean managed, ExecutorService this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate); this.performWritesConcurrently = this.configuration.mirroringOptions.performWritesConcurrently; this.waitForSecondaryWrites = this.configuration.mirroringOptions.waitForSecondaryWrites; + this.timestamper = + TimestamperFactory.create( + this.configuration.mirroringOptions.enableDefaultClientSideTimestamps); } @Override @@ -198,6 +204,7 @@ public Table call() throws IOException { this.flowController, this.secondaryWriteErrorConsumer, this.readSampler, + this.timestamper, this.performWritesConcurrently, this.waitForSecondaryWrites, this.mirroringTracer, @@ -223,6 +230,7 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams bufferedMutatorP flowController, executorService, secondaryWriteErrorConsumer, + timestamper, mirroringTracer); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java index 74dd65fafc..9654bb08f2 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringOptions.java @@ -18,6 +18,7 @@ import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_BUFFERED_MUTATOR_BYTES_TO_FLUSH; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONCURRENT_WRITES; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_CONNECTION_CONNECTION_TERMINATION_TIMEOUT; +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_DROP_ON_OVERFLOW_KEY; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_MAX_BUFFER_SIZE_KEY; import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FAILLOG_PREFIX_PATH_KEY; @@ -106,6 +107,7 @@ public static class Faillog { public final Faillog faillog; public final int resultScannerBufferedMismatchedResults; + public final boolean enableDefaultClientSideTimestamps; public MirroringOptions(Configuration configuration) { this.mismatchDetectorFactoryClass = @@ -153,5 +155,8 @@ public MirroringOptions(Configuration configuration) { "Performing writes concurrently and not waiting for writes is forbidden. " + "It has no advantage over performing writes asynchronously and not waiting for them."); this.faillog = new Faillog(configuration); + + this.enableDefaultClientSideTimestamps = + configuration.getBoolean(MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, false); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java index 59c4e76b64..893ad21dd2 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/MirroringTable.java @@ -36,6 +36,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.HierarchicalReferenceCounter; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory; import com.google.common.annotations.VisibleForTesting; @@ -114,6 +115,7 @@ public boolean apply(@NullableDecl Object o) { private final AtomicBoolean closed = new AtomicBoolean(false); private final SettableFuture closedFuture = SettableFuture.create(); private final int resultScannerBufferedMismatchedResults; + private final Timestamper timestamper; /** * @param executorService ExecutorService is used to perform operations on secondaryTable and * verification tasks. @@ -129,6 +131,7 @@ public MirroringTable( FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, ReadSampler readSampler, + Timestamper timestamper, boolean performWritesConcurrently, boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, @@ -148,6 +151,7 @@ public MirroringTable( this.mirroringTracer = mirroringTracer; this.requestScheduler = new RequestScheduler(flowController, this.mirroringTracer, this.referenceCounter); + this.timestamper = timestamper; this.batcher = new Batcher( this.primaryTable, @@ -156,6 +160,7 @@ public MirroringTable( this.secondaryWriteErrorConsumer, this.verificationContinuationFactory, this.readSampler, + this.timestamper, resultIsFaultyPredicate, waitForSecondaryWrites, performWritesConcurrently, @@ -545,6 +550,7 @@ private boolean checkAndMutateWithSpan( final byte[] value, final RowMutations rowMutations) throws IOException { + this.timestamper.fillTimestamp(rowMutations); boolean wereMutationsApplied = this.mirroringTracer.spanFactory.wrapPrimaryOperation( new CallableThrowingIOException() { diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java index 3430fe5c3a..5e69d9b876 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/ConcurrentMirroringBufferedMutator.java @@ -22,6 +22,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.util.concurrent.FutureCallback; @@ -78,6 +79,7 @@ public ConcurrentMirroringBufferedMutator( BufferedMutatorParams bufferedMutatorParams, MirroringConfiguration configuration, ExecutorService executorService, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { super( @@ -86,6 +88,7 @@ public ConcurrentMirroringBufferedMutator( bufferedMutatorParams, configuration, executorService, + timestamper, mirroringTracer); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java index c138355725..8fa6b57a78 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/MirroringBufferedMutator.java @@ -24,6 +24,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -60,6 +61,9 @@ */ @InternalApi("For internal usage only") public abstract class MirroringBufferedMutator implements BufferedMutator { + + private final Timestamper timestamper; + public static BufferedMutator create( boolean concurrent, Connection primaryConnection, @@ -69,6 +73,7 @@ public static BufferedMutator create( FlowController flowController, ExecutorService executorService, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { if (concurrent) { @@ -78,6 +83,7 @@ public static BufferedMutator create( bufferedMutatorParams, configuration, executorService, + timestamper, mirroringTracer); } else { return new SequentialMirroringBufferedMutator( @@ -88,6 +94,7 @@ public static BufferedMutator create( flowController, executorService, secondaryWriteErrorConsumer, + timestamper, mirroringTracer); } } @@ -121,6 +128,7 @@ public MirroringBufferedMutator( BufferedMutatorParams bufferedMutatorParams, MirroringConfiguration configuration, ExecutorService executorService, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { this.userListener = bufferedMutatorParams.getListener(); @@ -167,13 +175,16 @@ public void onException( this.mutationsBufferFlushThresholdBytes, this.ongoingFlushesCounter, this.mirroringTracer); + this.timestamper = timestamper; } @Override public void mutate(Mutation mutation) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope(HBaseOperation.BUFFERED_MUTATOR_MUTATE)) { - mutateScoped(Collections.singletonList(mutation)); + List m = Collections.singletonList(mutation); + timestamper.fillTimestamp(m); + mutateScoped(m); } } @@ -182,6 +193,7 @@ public void mutate(final List list) throws IOException { try (Scope scope = this.mirroringTracer.spanFactory.operationScope( HBaseOperation.BUFFERED_MUTATOR_MUTATE_LIST)) { + timestamper.fillTimestamp(list); mutateScoped(list); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java index 5b63133634..bbe1146250 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/SequentialMirroringBufferedMutator.java @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.collect.MapMaker; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -137,6 +138,7 @@ public SequentialMirroringBufferedMutator( FlowController flowController, ExecutorService executorService, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, + Timestamper timestamper, MirroringTracer mirroringTracer) throws IOException { super( @@ -145,6 +147,7 @@ public SequentialMirroringBufferedMutator( bufferedMutatorParams, configuration, executorService, + timestamper, mirroringTracer); this.secondaryWriteErrorConsumer = secondaryWriteErrorConsumer; this.flowController = flowController; diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java index 4051509f20..082f2f018f 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/Batcher.java @@ -29,6 +29,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -70,6 +71,7 @@ public class Batcher { private final boolean waitForSecondaryWrites; private final boolean performWritesConcurrently; private final MirroringTracer mirroringTracer; + private final Timestamper timestamper; public Batcher( Table primaryTable, @@ -78,6 +80,7 @@ public Batcher( SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, VerificationContinuationFactory verificationContinuationFactory, ReadSampler readSampler, + Timestamper timestamper, Predicate resultIsFaultyPredicate, boolean waitForSecondaryWrites, boolean performWritesConcurrently, @@ -92,6 +95,7 @@ public Batcher( this.waitForSecondaryWrites = waitForSecondaryWrites; this.performWritesConcurrently = performWritesConcurrently; this.mirroringTracer = mirroringTracer; + this.timestamper = timestamper; } public void batchSingleWriteOperation(Row operation) throws IOException { @@ -129,6 +133,7 @@ public void batch( final Object[] results, @Nullable final Callback callback) throws IOException, InterruptedException { + timestamper.fillTimestamp(inputOperations); final RewrittenIncrementAndAppendIndicesInfo actions = new RewrittenIncrementAndAppendIndicesInfo<>(inputOperations); Log.trace( diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java index a2748d38ce..104133a18b 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/MirroringConfigurationHelper.java @@ -30,6 +30,8 @@ import java.util.Objects; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.BufferedMutator; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -280,6 +282,21 @@ public class MirroringConfigurationHelper { public static final String MIRRORING_SCANNER_BUFFERED_MISMATCHED_READS = "google.bigtable.mirroring.result-scanner.buffered-mismatched-reads"; + /** + * If set to {@code true} the Mirroring Client will automatically add timestamps to {@link + * org.apache.hadoop.hbase.client.Put}s without timestamp set based on client's host local time. + * Client-side timestamps assigned by {@link Table}s and {@link BufferedMutator}`s created by one + * {@link Connection} are always increasing, even if system clock is moved backwards, for example + * by NTP or manually by the user. To enable client-side timestamping set this key to `true`. + * + *

Be aware that client-side timestamping modifies only `Put`s - `Delete`s, `Increment`s and + * `Append`s are not affected by this setting and will cause inconsistencies between databases. + * + *

Default value: false. + */ + public static final String MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS = + "google.bigtable.mirroring.enable-default-client-side-timestamps"; + public static void fillConnectionConfigWithClassImplementation( Configuration connectionConfig, Configuration config, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java new file mode 100644 index 0000000000..211a0fd89c --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimer.java @@ -0,0 +1,43 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import com.google.common.base.Stopwatch; +import java.util.concurrent.TimeUnit; + +/** + * {@code System#currentTimeMillis()} is not monotonic and using it as a source for {@link + * org.apache.hadoop.hbase.client.Mutation} timestamps can result in confusion and unexpected + * reordering of written versions. + * + *

This class provides a monotonically increasing value that is related to wall time. + * + *

Guava's {@link Stopwatch} is monotonic because it uses {@link System#nanoTime()} to measure + * passed time. + */ +public class MonotonicTimer { + private final long startingTimestampMillis; + private final Stopwatch stopwatch; + + public MonotonicTimer() { + this.startingTimestampMillis = System.currentTimeMillis(); + this.stopwatch = Stopwatch.createStarted(); + } + + public long getCurrentTimeMillis() { + return this.startingTimestampMillis + this.stopwatch.elapsed(TimeUnit.MILLISECONDS); + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimestamper.java new file mode 100644 index 0000000000..3680d822d4 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/MonotonicTimestamper.java @@ -0,0 +1,84 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +public class MonotonicTimestamper implements Timestamper { + private final MonotonicTimer timer = new MonotonicTimer(); + + @Override + public void fillTimestamp(Put put) { + long timestamp = timer.getCurrentTimeMillis(); + setPutTimestamp(put, timestamp); + } + + @Override + public void fillTimestamp(RowMutations rowMutations) { + fillTimestamp(rowMutations.getMutations()); + } + + @Override + public void fillTimestamp(Iterable list) { + long timestamp = timer.getCurrentTimeMillis(); + setTimestamp(list, timestamp); + } + + private void setTimestamp(Iterable list, long timestamp) { + for (Row mutation : list) { + setTimestamp(mutation, timestamp); + } + } + + private void setTimestamp(Row row, long timestamp) { + if (row instanceof Put) { + setPutTimestamp((Put) row, timestamp); + } else if (row instanceof RowMutations) { + setTimestamp(((RowMutations) row).getMutations(), timestamp); + } + // Bigtable doesn't support timestamps for Increment and Append and only a specific subset of + // Deletes, let's not modify them. + } + + private void setPutTimestamp(Put put, long timestamp) { + for (Map.Entry> entry : put.getFamilyCellMap().entrySet()) { + for (Cell cell : entry.getValue()) { + try { + if (isTimestampNotSet(cell.getTimestamp())) { + CellUtil.setTimestamp(cell, timestamp); + } + } catch (IOException e) { + // IOException is thrown when `cell` does not implement `SettableTimestamp` and if it + // doesn't the we do not have any reliable way for setting the timestamp, thus we are just + // leaving it as-is. + // This shouldn't happen for vanilla `Put` instances. + } + } + } + } + + private boolean isTimestampNotSet(long timestamp) { + return timestamp == HConstants.LATEST_TIMESTAMP; + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java new file mode 100644 index 0000000000..9f288f8175 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/NoopTimestamper.java @@ -0,0 +1,31 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +public class NoopTimestamper implements Timestamper { + @Override + public void fillTimestamp(Iterable list) {} + + @Override + public void fillTimestamp(RowMutations rowMutations) {} + + @Override + public void fillTimestamp(Put put) {} +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java new file mode 100644 index 0000000000..834c5d72e3 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/Timestamper.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.RowMutations; + +public interface Timestamper { + + void fillTimestamp(Iterable list); + + void fillTimestamp(RowMutations rowMutations); + + void fillTimestamp(Put put); +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java new file mode 100644 index 0000000000..8190535b83 --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/timestamper/TimestamperFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper; + +public class TimestamperFactory { + public static Timestamper create(boolean enableTimestamping) { + if (enableTimestamping) { + return new MonotonicTimestamper(); + } else { + return new NoopTimestamper(); + } + } +} diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java index f5be6e44d2..4759657caa 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringMetrics.java @@ -54,6 +54,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector; import io.opencensus.trace.Tracing; import java.io.IOException; @@ -92,6 +93,7 @@ public class TestMirroringMetrics { @Mock Table primaryTable; @Mock Table secondaryTable; @Mock FlowController flowController; + @Mock Timestamper timestamper; @Mock MirroringMetricsRecorder mirroringMetricsRecorder; @@ -117,6 +119,7 @@ public void setUp() { new FailedMutationLogger( tracer, mock(Appender.class), mock(Serializer.class)))), new ReadSampler(100), + this.timestamper, false, false, tracer, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java index c755a0e7ca..5af11d8fc7 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTable.java @@ -49,6 +49,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.DefaultMismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector.ScannerResultVerifier; @@ -114,6 +115,7 @@ public class TestMirroringTable { @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; @Mock ReferenceCounter referenceCounter; @Mock MirroringMetricsRecorder mirroringMetricsRecorder; + @Mock Timestamper timestamper; MismatchDetector mismatchDetector; MirroringTable mirroringTable; @@ -137,6 +139,7 @@ public void setUp() { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, false, false, this.mirroringTracer, @@ -1267,6 +1270,7 @@ public void testConcurrentWritesAreFlowControlledBeforePrimaryAction() flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, performWritesConcurrently, waitForSecondaryWrites, this.mirroringTracer, @@ -1356,6 +1360,7 @@ private void setupConcurrentMirroringTableWithDirectExecutor() { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, performWritesConcurrently, waitForSecondaryWrites, this.mirroringTracer, @@ -1488,6 +1493,7 @@ public void testConcurrentOpsAreRunConcurrently() throws IOException, Interrupte flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, performWritesConcurrently, waitForSecondaryWrites, this.mirroringTracer, diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java index 591ea45f16..75ef5fc6a3 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableInputModification.java @@ -34,6 +34,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -73,6 +74,7 @@ public class TestMirroringTableInputModification { @Mock MismatchDetector mismatchDetector; @Mock FlowController flowController; @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + @Mock Timestamper timestamper; MirroringTable mirroringTable; SettableFuture secondaryOperationBlockingFuture; @@ -90,6 +92,7 @@ public void setUp() throws IOException, InterruptedException { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + timestamper, false, false, new MirroringTracer(), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java index 23d4ca18a2..1721cb52fb 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestMirroringTableSynchronousMode.java @@ -35,6 +35,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import java.io.IOException; import java.util.Arrays; @@ -68,6 +69,7 @@ public class TestMirroringTableSynchronousMode { @Mock MismatchDetector mismatchDetector; @Mock FlowController flowController; @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; + @Mock Timestamper timestamper; MirroringTable mirroringTable; @@ -88,6 +90,7 @@ private void setupTable(boolean concurrent) { flowController, secondaryWriteErrorConsumer, new ReadSampler(100), + this.timestamper, concurrent, true, new MirroringTracer(), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java index 3d38952bb1..39996a69a1 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/TestVerificationSampling.java @@ -34,6 +34,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -61,6 +62,7 @@ @RunWith(JUnit4.class) public class TestVerificationSampling { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock Timestamper timestamper; @Rule public final ExecutorServiceRule executorServiceRule = @@ -91,6 +93,7 @@ public void setUp() { flowController, secondaryWriteErrorConsumer, readSampler, + timestamper, false, false, new MirroringTracer(), diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java index 2f074bdc6d..1d720b5641 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestConcurrentMirroringBufferedMutator.java @@ -33,6 +33,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.RequestResourcesDescription; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; import java.util.Arrays; @@ -48,6 +49,7 @@ import org.junit.runners.JUnit4; import org.mockito.ArgumentMatchers; import org.mockito.InOrder; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -59,6 +61,8 @@ public class TestConcurrentMirroringBufferedMutator { @Rule public final ExecutorServiceRule executorServiceRule = ExecutorServiceRule.cachedPoolExecutor(); + @Mock Timestamper timestamper; + public final MirroringBufferedMutatorCommon common = new MirroringBufferedMutatorCommon(); private final List singletonMutation1 = Collections.singletonList(common.mutation1); @@ -358,6 +362,7 @@ private BufferedMutator getBufferedMutator(long flushThreshold) throws IOExcepti common.bufferedMutatorParams, makeConfigurationWithFlushThreshold(flushThreshold), executorServiceRule.executorService, + timestamper, new MirroringTracer()); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java index b2cbbe24d5..16b0a81377 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestMirroringBufferedMutator.java @@ -22,18 +22,21 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.TestConnection; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @RunWith(JUnit4.class) public class TestMirroringBufferedMutator { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock Timestamper timestamper; @Rule public final ExecutorServiceRule executorServiceRule = ExecutorServiceRule.cachedPoolExecutor(); @@ -60,6 +63,7 @@ public void testMirroringBufferedMutatorFactory() throws IOException { mutatorRule.flowController, executorServiceRule.executorService, mutatorRule.secondaryWriteErrorConsumerWithMetrics, + timestamper, new MirroringTracer())) .isInstanceOf(SequentialMirroringBufferedMutator.class); @@ -73,6 +77,7 @@ public void testMirroringBufferedMutatorFactory() throws IOException { mutatorRule.flowController, executorServiceRule.executorService, mutatorRule.secondaryWriteErrorConsumerWithMetrics, + timestamper, new MirroringTracer())) .isInstanceOf(ConcurrentMirroringBufferedMutator.class); } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java index 1a45624eea..a21e531dc4 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/bufferedmutator/TestSequentialMirroringBufferedMutator.java @@ -29,6 +29,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.ExecutorServiceRule; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -51,6 +52,7 @@ import org.junit.runners.JUnit4; import org.mockito.ArgumentMatchers; import org.mockito.InOrder; +import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnit; @@ -60,6 +62,7 @@ @RunWith(JUnit4.class) public class TestSequentialMirroringBufferedMutator { @Rule public final MockitoRule mockitoRule = MockitoJUnit.rule(); + @Mock Timestamper timestamper; @Rule public final ExecutorServiceRule executorServiceRule = @@ -371,6 +374,7 @@ private BufferedMutator getBufferedMutator(long flushThreshold) throws IOExcepti common.flowController, executorServiceRule.executorService, common.secondaryWriteErrorConsumerWithMetrics, + timestamper, new MirroringTracer()); } } diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/TestMonotonicTimestamper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/TestMonotonicTimestamper.java new file mode 100644 index 0000000000..4ced0261fe --- /dev/null +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase1_x/utils/TestMonotonicTimestamper.java @@ -0,0 +1,149 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.mirroring.hbase1_x.utils; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.MonotonicTimestamper; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Append; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RowMutations; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestMonotonicTimestamper { + @Test + public void testFillingPutTimestamps() { + Put p = new Put("row".getBytes(StandardCharsets.UTF_8)); + p.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + p.addColumn("f1".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + p.addImmutable("f2".getBytes(), "q1".getBytes(), "v".getBytes()); + p.addImmutable("f2".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + + long timestampBefore = System.currentTimeMillis(); + new MonotonicTimestamper().fillTimestamp(p); + long timestampAfter = System.currentTimeMillis(); + + assertThat(p.get("f1".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtLeast(timestampBefore); + assertThat(p.get("f1".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtMost(timestampAfter); + + assertThat(p.get("f1".getBytes(), "q2".getBytes()).get(0).getTimestamp()).isEqualTo(123L); + + assertThat(p.get("f2".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtLeast(timestampBefore); + assertThat(p.get("f2".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtMost(timestampAfter); + + assertThat(p.get("f2".getBytes(), "q2".getBytes()).get(0).getTimestamp()).isEqualTo(123L); + } + + @Test + public void testFillingRowMutationsTimestamps() throws IOException { + RowMutations rm = new RowMutations("row".getBytes()); + Put p = new Put("row".getBytes(StandardCharsets.UTF_8)); + p.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + p.addColumn("f1".getBytes(), "q2".getBytes(), 123L, "v".getBytes()); + rm.add(p); + + Delete d = new Delete("row".getBytes(StandardCharsets.UTF_8)); + d.addColumn("f1".getBytes(), "q1".getBytes()); + d.addColumn("f1".getBytes(), "q2".getBytes(), 123L); + rm.add(d); + + long timestampBefore = System.currentTimeMillis(); + new MonotonicTimestamper().fillTimestamp(rm); + long timestampAfter = System.currentTimeMillis(); + + CellScanner cs0 = rm.getMutations().get(0).cellScanner(); + cs0.advance(); + assertThat(cs0.current().getTimestamp()).isAtLeast(timestampBefore); + assertThat(cs0.current().getTimestamp()).isAtMost(timestampAfter); + cs0.advance(); + assertThat(cs0.current().getTimestamp()).isEqualTo(123L); + + CellScanner cs1 = rm.getMutations().get(1).cellScanner(); + cs1.advance(); + assertThat(cs1.current().getTimestamp()).isEqualTo(HConstants.LATEST_TIMESTAMP); + cs1.advance(); + assertThat(cs1.current().getTimestamp()).isEqualTo(123L); + } + + @Test + public void testFillingListOfMutations() throws IOException { + Put p = new Put("row".getBytes(StandardCharsets.UTF_8)); + p.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + Delete d = new Delete("row".getBytes(StandardCharsets.UTF_8)); + d.addColumn("f1".getBytes(), "q1".getBytes()); + + Increment i = new Increment("row".getBytes(StandardCharsets.UTF_8)); + i.addColumn("f1".getBytes(), "q1".getBytes(), 1); + + Append a = new Append("row".getBytes(StandardCharsets.UTF_8)); + a.add("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + RowMutations rm = new RowMutations("row".getBytes()); + + long timestampBefore = System.currentTimeMillis(); + new MonotonicTimestamper().fillTimestamp(Arrays.asList(p, d, i, a, rm)); + long timestampAfter = System.currentTimeMillis(); + + assertThat(p.get("f1".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtLeast(timestampBefore); + assertThat(p.get("f1".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtMost(timestampAfter); + + assertThat(d.getFamilyCellMap().values().iterator().next().get(0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(i.getFamilyCellMap().values().iterator().next().get(0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + assertThat(a.getFamilyCellMap().values().iterator().next().get(0).getTimestamp()) + .isEqualTo(HConstants.LATEST_TIMESTAMP); + } + + @Test + public void testFillingListOfRowMutations() throws IOException { + Put p = new Put("row".getBytes(StandardCharsets.UTF_8)); + p.addColumn("f1".getBytes(), "q1".getBytes(), "v".getBytes()); + + Delete d = new Delete("row".getBytes(StandardCharsets.UTF_8)); + d.addColumn("f1".getBytes(), "q1".getBytes()); + + RowMutations rm = new RowMutations("row".getBytes()); + rm.add(p); + rm.add(d); + + long timestampBefore = System.currentTimeMillis(); + new MonotonicTimestamper().fillTimestamp(Arrays.asList(rm)); + long timestampAfter = System.currentTimeMillis(); + + assertThat(p.get("f1".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtLeast(timestampBefore); + assertThat(p.get("f1".getBytes(), "q1".getBytes()).get(0).getTimestamp()) + .isAtMost(timestampAfter); + } +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java index f1df0bcefd..8ce2d3b434 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java @@ -26,8 +26,8 @@ import com.google.cloud.bigtable.hbase.mirroring.utils.ConfigurationHelper; import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; -import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java index d60b449593..8d4f95d507 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java @@ -26,9 +26,9 @@ import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; -import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; import com.google.cloud.bigtable.hbase.mirroring.utils.PropagatingThread; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.mirroring.hbase1_x.ExecutorServiceRule; import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection; import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java index 5e25e2f635..071206881b 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.hbase.mirroring; +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS; import static com.google.common.truth.Truth.assertThat; import com.google.cloud.bigtable.hbase.mirroring.utils.AsyncConnectionRule; @@ -22,8 +23,8 @@ import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; -import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestMismatchDetectorCounter; import com.google.cloud.bigtable.hbase.mirroring.utils.TestWriteErrorConsumer; import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegion; import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegionRule; @@ -82,6 +83,10 @@ public class TestMirroringAsyncTable { public static final Configuration config = ConfigurationHelper.newConfiguration(); + static { + config.setBoolean(MIRRORING_ENABLE_DEFAULT_CLIENT_SIDE_TIMESTAMPS, true); + } + static final byte[] columnFamily1 = "cf1".getBytes(); static final byte[] qualifier1 = "cq1".getBytes(); static final byte[] qualifier2 = "cq2".getBytes(); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java index 98c890c571..79f1486a97 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncBufferedMutator.java @@ -67,6 +67,7 @@ public Configuration getConfiguration() { @Override public CompletableFuture mutate(Mutation mutation) { + // TODO: add timestamps referenceCounter.incrementReferenceCount(); CompletableFuture primaryCompleted = primary.mutate(mutation); CompletableFuture resultFuture = new CompletableFuture<>(); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java index 894054fc2d..2f81474e37 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java @@ -24,6 +24,8 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.TimestamperFactory; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; @@ -67,6 +69,7 @@ public class MirroringAsyncConnection implements AsyncConnection { private final AtomicBoolean closed = new AtomicBoolean(false); private final ReadSampler readSampler; private final ExecutorService executorService; + private final Timestamper timestamper; /** * The constructor called from {@link @@ -142,6 +145,9 @@ public MirroringAsyncConnection( this.readSampler = new ReadSampler(this.configuration.mirroringOptions.readSamplingRate); this.executorService = Executors.newCachedThreadPool(); + this.timestamper = + TimestamperFactory.create( + this.configuration.mirroringOptions.enableDefaultClientSideTimestamps); } public AsyncConnection getPrimaryConnection() { @@ -291,6 +297,7 @@ public AsyncTable build() { secondaryWriteErrorConsumer, mirroringTracer, readSampler, + timestamper, referenceCounter, executorService, configuration.mirroringOptions.resultScannerBufferedMismatchedResults); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java index 07d79489f2..95bfabe03a 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncTable.java @@ -35,6 +35,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.VerificationContinuationFactory; import com.google.cloud.bigtable.mirroring.hbase2_x.utils.AsyncRequestScheduling.OperationStages; @@ -95,6 +96,7 @@ public class MirroringAsyncTable implements As private final ExecutorService executorService; private final RequestScheduler requestScheduler; private final int resultScannerBufferedMismatchedResults; + private final Timestamper timestamper; public MirroringAsyncTable( AsyncTable primaryTable, @@ -104,6 +106,7 @@ public MirroringAsyncTable( SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer, MirroringTracer mirroringTracer, ReadSampler readSampler, + Timestamper timestamper, ListenableReferenceCounter referenceCounter, ExecutorService executorService, int resultScannerBufferedMismatchedResults) { @@ -119,6 +122,7 @@ public MirroringAsyncTable( this.requestScheduler = new RequestScheduler(this.flowController, this.mirroringTracer, this.referenceCounter); this.resultScannerBufferedMismatchedResults = resultScannerBufferedMismatchedResults; + this.timestamper = timestamper; } @Override @@ -150,6 +154,7 @@ public CompletableFuture exists(Get get) { @Override public CompletableFuture put(Put put) { + this.timestamper.fillTimestamp(put); CompletableFuture primaryFuture = this.primaryTable.put(put); return writeWithFlowControl( new WriteOperationInfo(put), primaryFuture, () -> this.secondaryTable.put(put)) @@ -186,6 +191,7 @@ public CompletableFuture increment(Increment increment) { @Override public CompletableFuture mutateRow(RowMutations rowMutations) { + this.timestamper.fillTimestamp(rowMutations); CompletableFuture primaryFuture = this.primaryTable.mutateRow(rowMutations); return writeWithFlowControl( new WriteOperationInfo(rowMutations), @@ -257,6 +263,7 @@ OperationStages>> generalBatch( Function, GeneralBatchBuilder> batchBuilderCreator, Class successfulResultTypeClass) { + this.timestamper.fillTimestamp(userActions); OperationUtils.RewrittenIncrementAndAppendIndicesInfo actions = new OperationUtils.RewrittenIncrementAndAppendIndicesInfo<>(userActions); final int numActions = actions.operations.size(); @@ -565,6 +572,7 @@ private OperationStages> checkAndMutate( @Override public CompletableFuture thenPut(Put put) { + timestamper.fillTimestamp(put); return checkAndMutate( new WriteOperationInfo(put), this.primaryBuilder.thenPut(put), @@ -583,6 +591,7 @@ public CompletableFuture thenDelete(Delete delete) { @Override public CompletableFuture thenMutate(RowMutations rowMutations) { + timestamper.fillTimestamp(rowMutations); return checkAndMutate( new WriteOperationInfo(rowMutations), this.primaryBuilder.thenMutate(rowMutations), diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java index 5df9c91acd..481fe54466 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringConnection.java @@ -84,6 +84,7 @@ public Table build() { flowController, secondaryWriteErrorConsumer, readSampler, + timestamper, performWritesConcurrently, waitForSecondaryWrites, mirroringTracer, diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java index 73c52807a6..0d3dd5f344 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringTable.java @@ -20,6 +20,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import java.io.IOException; import java.util.List; @@ -41,6 +42,7 @@ public MirroringTable( FlowController flowController, SecondaryWriteErrorConsumer secondaryWriteErrorConsumer, ReadSampler readSampler, + Timestamper timestamper, boolean performWritesConcurrently, boolean waitForSecondaryWrites, MirroringTracer mirroringTracer, @@ -54,6 +56,7 @@ public MirroringTable( flowController, secondaryWriteErrorConsumer, readSampler, + timestamper, performWritesConcurrently, waitForSecondaryWrites, mirroringTracer, diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java index 32ff8d4089..a661cc84e6 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java @@ -44,6 +44,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringSpanConstants.HBaseOperation; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import com.google.common.primitives.Longs; @@ -102,6 +103,7 @@ public class TestMirroringAsyncTable { @Mock ListenableReferenceCounter referenceCounter; @Mock AsyncTable.CheckAndMutateBuilder primaryBuilder; @Mock ExecutorService executorService; + @Mock Timestamper timestamper; MirroringAsyncTable mirroringTable; @@ -118,6 +120,7 @@ public void setUp() { secondaryWriteErrorConsumer, new MirroringTracer(), new ReadSampler(100), + timestamper, referenceCounter, executorService, 10)); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java index 5d01abf737..3dc233e70d 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTableInputModification.java @@ -32,6 +32,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.util.concurrent.SettableFuture; import java.util.ArrayList; @@ -72,6 +73,7 @@ public class TestMirroringAsyncTableInputModification { @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; @Mock ListenableReferenceCounter referenceCounter; @Mock ExecutorService executorService; + @Mock Timestamper timestamper; MirroringAsyncTable mirroringTable; CompletableFuture letPrimaryThroughFuture; @@ -91,6 +93,7 @@ public void setUp() { secondaryWriteErrorConsumer, new MirroringTracer(), new ReadSampler(100), + timestamper, referenceCounter, executorService, 10)); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java index b722a61029..87b2444616 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestVerificationSampling.java @@ -32,6 +32,7 @@ import com.google.cloud.bigtable.mirroring.hbase1_x.utils.flowcontrol.FlowController; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.mirroringmetrics.MirroringTracer; import com.google.cloud.bigtable.mirroring.hbase1_x.utils.referencecounting.ListenableReferenceCounter; +import com.google.cloud.bigtable.mirroring.hbase1_x.utils.timestamper.Timestamper; import com.google.cloud.bigtable.mirroring.hbase1_x.verification.MismatchDetector; import com.google.common.collect.ImmutableList; import java.io.IOException; @@ -73,6 +74,7 @@ public class TestVerificationSampling { @Mock SecondaryWriteErrorConsumerWithMetrics secondaryWriteErrorConsumer; @Mock ReadSampler readSampler; @Mock ListenableReferenceCounter referenceCounter; + @Mock Timestamper timestamper; MirroringAsyncTable mirroringTable; @@ -92,6 +94,7 @@ public void setUp() { secondaryWriteErrorConsumer, new MirroringTracer(), readSampler, + timestamper, referenceCounter, executorServiceRule.executorService, 10)); diff --git a/quickstart.md b/quickstart.md index a38428a6bb..80bb876fc6 100644 --- a/quickstart.md +++ b/quickstart.md @@ -131,6 +131,11 @@ In the concurrent mode writes are passed to both mutators at once. As in sequent Set `google.bigtable.mirroring.concurrent-writes` to `true` to enable concurrent Buffered Mutator mode (defaults to false). +## Client-side timestamping +HBase and Bigtable assign row version (timestamp) based on server-side time for mutations without version assigned by the client. The Mirroring Client issues writes to underlying databases a few milliseconds apart and performing mutations without version assigned on the client side will cause inconsistencies between databases. To mitigate some of those issues a client-side timestamping is available in the Mirroring Client. When client-side timestamping is enabled the Mirroring Client will automatically add a timestamp based on client's local time to every `Put` object passed to the Mirroring Client. Client-side timestamps assigned by `Table`s and `BufferedMutator`s created by one `Connection` are always increasing, even if system clock is moved backwards, for example by NTP or manually by the user. +Be aware that client-side timestamping modifies only `Put`s - `Delete`s, `Increment`s and `Append`s are not affected by this setting and will cause inconsistencies between databases. +To enable client-side timestamping set `google.bigtable.mirroring.enable-default-client-side-timestamps` to `true`. + ## Configuration options - `google.bigtable.mirroring.primary-client.connection.impl` - a name of Connection class that should be used to connect to primary database. It is used as hbase.client.connection.impl when creating connection to primary database. Set to `default` to use default HBase connection class. Required. @@ -151,11 +156,12 @@ Set `google.bigtable.mirroring.concurrent-writes` to `true` to enable concurrent - `google.bigtable.mirroring.write-error-log.appender.drop-on-overflow` - used by DefaultAppender, whether to drop data if the thread flushing the data to disk is not keeping up or to block until it catches up. default: false. - `google.bigtable.mirroring.read-verification-rate-percent` - Integer value representing percentage of read operations performed on primary database that should be verified against secondary. Each call to `Table#get(Get)`, `Table#get(List)`, `Table#exists(Get)`, `Table#existsAll(List)`, `Table#batch(List, Object[])` (with overloads) and `Table#getScanner(Scan)` (with overloads) is counted as a single operation, independent of size of their arguments and results. Correct values are a integers ranging from 0 to 100 inclusive. default: 100. - `google.bigtable.mirroring.buffered-mutator.bytes-to-flush` - Number of bytes that `MirroringBufferedMutator` should buffer before flushing underlying primary BufferedMutator and executing a write to the secondary database. If not set the value of `hbase.client.write.buffer` is used, which by default is 2MB. When those values are kept in sync, the mirroring client should perform a flush operation on the primary BufferedMutator right after it schedules a new asynchronous write to the database. +- `google.bigtable.mirroring.enable-default-client-side-timestamps` - Enable client-side timestamping. default: false. ## Caveats ### Timestamps -For ensuring full consistency between databases the user should always specify a timestamp for mutations issued using MirroringClient. Mutations without a timestamp will have one assigned by underlying database clients when the mutations are issued to underlying databases, what doesn't happen at the exact same instant for both databases. +Be aware that client-side timestamping modifies only `Put`s - `Delete`s, `Increment`s and `Append`s are not affected by this setting and will cause inconsistencies between databases. ### Differences between Bigtable and HBase There are differences between HBase and Bigtable, please consult [this link](https://cloud.google.com/bigtable/docs/hbase-differences). Code using this client should be aware of them.