From 1dac7e4cd141d91b06187e8d3fc5c8b6c6864419 Mon Sep 17 00:00:00 2001 From: Adam Czajkowski Date: Fri, 15 Oct 2021 13:45:25 +0200 Subject: [PATCH] test: 2.x mirroring integration tests --- .../mirroring/utils/ConfigurationHelper.java | 4 + .../hbase/mirroring/utils/ConnectionRule.java | 10 +- .../pom.xml | 3 +- .../regionserver/FailingHBaseHRegion2.java | 31 + .../pom.xml | 456 ++++++ .../hbase/MavenPlaceholderIntegration2x.java | 23 + .../hbase/mirroring/IntegrationTests.java | 33 + .../hbase/mirroring/TestBlocking.java | 154 ++ .../hbase/mirroring/TestErrorDetection.java | 306 ++++ .../mirroring/TestMirroringAsyncTable.java | 1363 +++++++++++++++++ .../mirroring/utils/AsyncConnectionRule.java | 42 + .../bigtable-to-hbase-local-configuration.xml | 66 + .../hbase-to-bigtable-local-configuration.xml | 66 + .../src/test/resources/log4j.properties | 7 + .../src/test/resources/prometheus.yml | 13 + .../hbase2_x/MirroringAsyncConnection.java | 10 +- .../hbase2_x/TestMirroringAsyncTable.java | 33 + .../pom.xml | 1 + 18 files changed, 2615 insertions(+), 6 deletions(-) create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/pom.xml create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/main/java/com/google/cloud/bigtable/hbase/MavenPlaceholderIntegration2x.java create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/IntegrationTests.java create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/AsyncConnectionRule.java create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/bigtable-to-hbase-local-configuration.xml create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/hbase-to-bigtable-local-configuration.xml create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/log4j.properties create mode 100644 bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/prometheus.yml diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConfigurationHelper.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConfigurationHelper.java index d239d49dbb..c627e701b5 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConfigurationHelper.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConfigurationHelper.java @@ -70,6 +70,10 @@ private static void fillDefaults(Configuration configuration) { "hbase.client.connection.impl", "com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection"); + configuration.setIfUnset( + "hbase.client.async.connection.impl", + "com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection"); + configuration.setIfUnset( "google.bigtable.mirroring.mismatch-detector.impl", TestMismatchDetector.class.getCanonicalName()); diff --git a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConnectionRule.java b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConnectionRule.java index 2d6e44a409..124a8598c3 100644 --- a/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConnectionRule.java +++ b/bigtable-hbase-mirroring-client-1.x-parent/bigtable-hbase-mirroring-client-1.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/ConnectionRule.java @@ -51,14 +51,18 @@ public MirroringConnection createConnection(ExecutorService executorService) thr public MirroringConnection createConnection( ExecutorService executorService, Configuration configuration) throws IOException { - if (baseMiniCluster != null) { - baseMiniCluster.updateConfigurationWithHbaseMiniClusterProps(configuration); - } + updateConfigurationWithHbaseMiniClusterProps(configuration); Connection conn = ConnectionFactory.createConnection(configuration, executorService); return (MirroringConnection) conn; } + public void updateConfigurationWithHbaseMiniClusterProps(Configuration configuration) { + if (baseMiniCluster != null) { + baseMiniCluster.updateConfigurationWithHbaseMiniClusterProps(configuration); + } + } + @Override protected void after() { if (baseMiniCluster != null) { diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/pom.xml b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/pom.xml index 7b9709c4ad..941702a8d5 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/pom.xml +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/pom.xml @@ -315,7 +315,7 @@ limitations under the License. com.google.cloud.bigtable bigtable-hbase-mirroring-client-1.x-integration-tests - 2.0.0-alpha2-SNAPSHOT + 2.0.0-alpha2-SNAPSHOT test test-jar @@ -328,7 +328,6 @@ limitations under the License. test - diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java index 8fa8601e69..75ffe6fe33 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-1.x-2.x-integration-tests/src/test/java/org/apache/hadoop/hbase/regionserver/FailingHBaseHRegion2.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Append; @@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.wal.WAL; /** @@ -120,4 +123,32 @@ public Result append(Append mutation, long nonceGroup, long nonce) throws IOExce processRowThrow(mutation.getRow()); return super.append(mutation, nonceGroup, nonce); } + + @Override + public boolean checkAndMutate( + byte[] row, + byte[] family, + byte[] qualifier, + CompareOperator op, + ByteArrayComparable comparator, + TimeRange timeRange, + Mutation mutation) + throws IOException { + processRowThrow(row); + return super.checkAndMutate(row, family, qualifier, op, comparator, timeRange, mutation); + } + + @Override + public boolean checkAndRowMutate( + byte[] row, + byte[] family, + byte[] qualifier, + CompareOperator op, + ByteArrayComparable comparator, + TimeRange timeRange, + RowMutations rm) + throws IOException { + processRowThrow(row); + return super.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); + } } diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/pom.xml b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/pom.xml new file mode 100644 index 0000000000..b62bcb0323 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/pom.xml @@ -0,0 +1,456 @@ + + + + 4.0.0 + + + com.google.cloud.bigtable + bigtable-hbase-mirroring-client-2.x-parent + 2.0.0-alpha2-SNAPSHOT + + + bigtable-hbase-mirroring-client-2.x-integration-tests + jar + ${project.groupId}:${project.artifactId} + + This project contains test cases that ought to work for either bigtable-hbase or hbase proper. + + + + ${hbase2.version} + com.google.cloud.bigtable.hbase2_x.BigtableConnection + org.apache.hadoop.hbase.client.BigtableAsyncConnection + 1800 + + + + + HBase2ToBigtableLocalIntegrationTests + + + + ${project.groupId} + bigtable-emulator-maven-plugin + 2.0.0-alpha2-SNAPSHOT + + + + start + stop + + + bigtable.emulator.endpoint + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + integration-tests + + integration-test + verify + + + + **/IntegrationTests.java + + + + hbase-to-bigtable-local-configuration.xml + com.google.cloud.bigtable.hbase.mirroring.utils.compat.TableCreator2x + org.apache.hadoop.hbase.regionserver.FailingHBaseHRegion2 + + + + ${bigtable.emulator.endpoint} + + + true + + + 1 + ${test.timeout} + + + true + + + + ${project.build.directory}/failsafe-reports/integration-tests/failsafe-summary.xml + + ${project.build.directory}/failsafe-reports/integration-tests + + + + + + + + + + BigtableToHBase2LocalIntegrationTests + + + + ${project.groupId} + bigtable-emulator-maven-plugin + 2.0.0-alpha2-SNAPSHOT + + + + start + stop + + + bigtable.emulator.endpoint + + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + + integration-tests + + integration-test + verify + + + + **/IntegrationTests.java + + + + bigtable-to-hbase-local-configuration.xml + com.google.cloud.bigtable.hbase.mirroring.utils.compat.TableCreator2x + org.apache.hadoop.hbase.regionserver.FailingHBaseHRegion2 + + + + ${bigtable.emulator.endpoint} + + + true + + + 1 + ${test.timeout} + + + true + + + + ${project.build.directory}/failsafe-reports/integration-tests/failsafe-summary.xml + + ${project.build.directory}/failsafe-reports/integration-tests + + + + + + + + + + + + + + com.google.cloud + google-cloud-bigtable-bom + ${bigtable.version} + pom + import + + + + com.google.cloud + google-cloud-bigtable-deps-bom + ${bigtable.version} + pom + import + + + + + + + + com.google.cloud.bigtable + bigtable-hbase-mirroring-client-2.x + 2.0.0-alpha2-SNAPSHOT + test + + + + ${project.groupId} + bigtable-hbase-2.x + 2.0.0-alpha2-SNAPSHOT + test + + + + org.apache.hbase + hbase-shaded-client + + + + + + + com.google.cloud + google-cloud-bigtable + ${bigtable.version} + test + + + + + org.apache.hbase + hbase-shaded-testing-util + ${hbase2.version} + test + + + + + + com.google.code.findbugs + jsr305 + ${jsr305.version} + test + + + + com.google.guava + guava + 30.1.1-android + test + + + + commons-lang + commons-lang + ${commons-lang.version} + test + + + + io.opencensus + opencensus-impl + 0.28.0 + + + io.opencensus + opencensus-exporter-trace-zipkin + 0.28.0 + + + io.opencensus + opencensus-exporter-stats-prometheus + 0.28.0 + + + io.prometheus + simpleclient_httpserver + 0.3.0 + + + + + junit + junit + ${junit.version} + test + + + + org.junit.platform + junit-platform-launcher + 1.6.2 + test + + + com.google.truth + truth + 1.1.2 + test + + + org.apache.logging.log4j + log4j-api + 2.14.1 + test + + + org.apache.logging.log4j + log4j-core + 2.14.1 + test + + + com.google.cloud.bigtable + bigtable-hbase-mirroring-client-1.x-integration-tests + 2.0.0-alpha2-SNAPSHOT + test + test-jar + + + org.apache.hbase + * + + + + + com.google.cloud.bigtable + bigtable-hbase-mirroring-client-1.x-2.x-integration-tests + 2.0.0-alpha2-SNAPSHOT + test-jar + test + + + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.14.1 + test + + + + + + org.slf4j + slf4j-api + 1.7.30 + test + + + org.slf4j + slf4j-log4j12 + 1.7.30 + test + + + + + io.dropwizard.metrics + metrics-core + 3.2.6 + test + + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 3.0.0-M1 + + true + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + true + + + + org.apache.maven.plugins + maven-site-plugin + + true + + + + org.apache.maven.plugins + maven-source-plugin + + true + + + + org.apache.maven.plugins + maven-javadoc-plugin + + true + + + + org.apache.maven.plugins + maven-gpg-plugin + + true + + + + org.codehaus.mojo + clirr-maven-plugin + + true + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + default-test + test + + test + + + false + + **/*.java + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + + diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/main/java/com/google/cloud/bigtable/hbase/MavenPlaceholderIntegration2x.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/main/java/com/google/cloud/bigtable/hbase/MavenPlaceholderIntegration2x.java new file mode 100644 index 0000000000..6ea5aee245 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/main/java/com/google/cloud/bigtable/hbase/MavenPlaceholderIntegration2x.java @@ -0,0 +1,23 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.hbase; + +/* + * This is a placeholder src/main class which is a workaround to run the maven-jar-plugin. + */ +class MavenPlaceholderIntegration2x { + private MavenPlaceholderIntegration2x() {} +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/IntegrationTests.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/IntegrationTests.java new file mode 100644 index 0000000000..de53f37941 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/IntegrationTests.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.hbase.mirroring; + +import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; +import org.junit.ClassRule; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +@RunWith(Suite.class) +@Suite.SuiteClasses({ + TestMirroringAsyncTable.class, + TestBlocking.class, + TestErrorDetection.class, +}) +public class IntegrationTests { + // Classes in test suites should use their own ConnectionRule, the one here serves to keep a + // single HBase MiniCluster connection up for all tests (if one is needed). + @ClassRule public static ConnectionRule connectionRule = new ConnectionRule(); +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java new file mode 100644 index 0000000000..f8627c45fc --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestBlocking.java @@ -0,0 +1,154 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.hbase.mirroring; + +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS; +import static com.google.cloud.bigtable.mirroring.hbase1_x.utils.MirroringConfigurationHelper.MIRRORING_MISMATCH_DETECTOR_CLASS; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigtable.hbase.mirroring.utils.AsyncConnectionRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.ConfigurationHelper; +import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; +import com.google.cloud.bigtable.hbase.mirroring.utils.ExecutorServiceRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; +import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounter; +import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.SlowMismatchDetector; +import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.Get; +import org.junit.Assume; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; + +public class TestBlocking { + static final byte[] columnFamily1 = "cf1".getBytes(); + static final byte[] qualifier1 = "q1".getBytes(); + @ClassRule public static ConnectionRule connectionRule = new ConnectionRule(); + + @ClassRule + public static AsyncConnectionRule asyncConnectionRule = new AsyncConnectionRule(connectionRule); + + @Rule + public MismatchDetectorCounterRule mismatchDetectorCounterRule = + new MismatchDetectorCounterRule(); + + @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + public DatabaseHelpers databaseHelpers = new DatabaseHelpers(connectionRule, executorServiceRule); + + @Test + public void testConnectionCloseBlocksUntilAllRequestsHaveBeenVerified() throws IOException { + long beforeConnectionClose; + long afterConnectionClose; + + Configuration config = ConfigurationHelper.newConfiguration(); + config.set(MIRRORING_MISMATCH_DETECTOR_CLASS, SlowMismatchDetector.class.getCanonicalName()); + SlowMismatchDetector.sleepTime = 1000; + + TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName); + for (int i = 0; i < 10; i++) { + Get get = new Get("1".getBytes()); + get.addColumn(columnFamily1, qualifier1); + t.get(get); + } + beforeConnectionClose = System.currentTimeMillis(); + } + afterConnectionClose = System.currentTimeMillis(); + long connectionCloseDuration = afterConnectionClose - beforeConnectionClose; + assertThat(connectionCloseDuration).isGreaterThan(900); + assertThat(MismatchDetectorCounter.getInstance().getVerificationsStartedCounter()) + .isEqualTo(10); + assertThat(MismatchDetectorCounter.getInstance().getVerificationsFinishedCounter()) + .isEqualTo(10); + } + + @Test + public void testSlowSecondaryConnection() + throws IOException, ExecutionException, InterruptedException { + // TODO(mwalkiewicz): fix BigtableToHBase2 + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + Configuration config = ConfigurationHelper.newConfiguration(); + config.set(MIRRORING_MISMATCH_DETECTOR_CLASS, SlowMismatchDetector.class.getCanonicalName()); + SlowMismatchDetector.sleepTime = 100; + config.set(MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS, "10"); + TableName tableName = connectionRule.createTable(columnFamily1); + byte[] row = "1".getBytes(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + asyncConnection + .getTable(tableName) + .put(Helpers.createPut(row, columnFamily1, qualifier1, "1".getBytes())) + .get(); + } + + long startTime; + long endTime; + long duration; + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + startTime = System.currentTimeMillis(); + AsyncTable t = asyncConnection.getTable(tableName); + for (int i = 0; i < 1000; i++) { + t.get(Helpers.createGet(row, columnFamily1, qualifier1)); + } + } + endTime = System.currentTimeMillis(); + duration = endTime - startTime; + // 1000 requests * 100 ms / 10 concurrent requests + assertThat(duration).isGreaterThan(10000); + + config.set(MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS, "50"); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + startTime = System.currentTimeMillis(); + AsyncTable t = asyncConnection.getTable(tableName); + for (int i = 0; i < 1000; i++) { + t.get(Helpers.createGet(row, columnFamily1, qualifier1)); + } + } + endTime = System.currentTimeMillis(); + duration = endTime - startTime; + // 1000 requests * 100 ms / 50 concurrent requests + assertThat(duration).isGreaterThan(2000); + + config.set(MIRRORING_FLOW_CONTROLLER_MAX_OUTSTANDING_REQUESTS, "1000"); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + startTime = System.currentTimeMillis(); + AsyncTable t = asyncConnection.getTable(tableName); + for (int i = 0; i < 1000; i++) { + t.get(Helpers.createGet(row, columnFamily1, qualifier1)); + } + } + endTime = System.currentTimeMillis(); + duration = endTime - startTime; + // 1000 requests * 100 ms / 1000 concurrent requests + assertThat(duration).isLessThan(1000); + } +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java new file mode 100644 index 0000000000..395a842114 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestErrorDetection.java @@ -0,0 +1,306 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.hbase.mirroring; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.bigtable.hbase.mirroring.utils.AsyncConnectionRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.ConfigurationHelper; +import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; +import com.google.cloud.bigtable.hbase.mirroring.utils.ExecutorServiceRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; +import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounter; +import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.PropagatingThread; +import com.google.cloud.bigtable.mirroring.hbase1_x.MirroringConnection; +import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; +import com.google.common.primitives.Longs; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Table; +import org.junit.Assume; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestErrorDetection { + static final byte[] columnFamily1 = "cf1".getBytes(); + static final byte[] qualifier1 = "q1".getBytes(); + @ClassRule public static ConnectionRule connectionRule = new ConnectionRule(); + + @ClassRule + public static AsyncConnectionRule asyncConnectionRule = new AsyncConnectionRule(connectionRule); + + @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + + @Rule + public MismatchDetectorCounterRule mismatchDetectorCounterRule = + new MismatchDetectorCounterRule(); + + public DatabaseHelpers databaseHelpers = new DatabaseHelpers(connectionRule, executorServiceRule); + + public static Configuration config = ConfigurationHelper.newConfiguration(); + + @Test + public void readsAndWritesArePerformed() + throws IOException, ExecutionException, InterruptedException { + final TableName tableName = connectionRule.createTable(columnFamily1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + asyncConnection + .getTable(tableName) + .put(Helpers.createPut("1".getBytes(), columnFamily1, qualifier1, "1".getBytes())) + .get(); + } + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + Result result = + asyncConnection + .getTable(tableName) + .get(Helpers.createGet("1".getBytes(), columnFamily1, qualifier1)) + .get(); + assertArrayEquals(result.getRow(), "1".getBytes()); + assertArrayEquals(result.getValue(columnFamily1, qualifier1), "1".getBytes()); + assertEquals(MismatchDetectorCounter.getInstance().getErrorCount(), 0); + } + } + + @Test + public void mismatchIsDetected() throws IOException, InterruptedException, ExecutionException { + final TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + asyncConnection + .getPrimaryConnection() + .getTable(tableName) + .put(Helpers.createPut("1".getBytes(), columnFamily1, qualifier1, "1".getBytes())) + .get(); + } + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + asyncConnection + .getSecondaryConnection() + .getTable(tableName) + .put(Helpers.createPut("1".getBytes(), columnFamily1, qualifier1, "2".getBytes())) + .get(); + } + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + Result result = + asyncConnection + .getTable(tableName) + .get(Helpers.createGet("1".getBytes(), columnFamily1, qualifier1)) + .get(); + // Data from primary is returned. + assertArrayEquals(result.getRow(), "1".getBytes()); + assertArrayEquals(result.getValue(columnFamily1, qualifier1), "1".getBytes()); + } + + assertEquals(1, MismatchDetectorCounter.getInstance().getErrorCount()); + } + + @Test + public void concurrentInsertionAndReadingInsertsWithScanner() + throws IOException, InterruptedException, TimeoutException { + + class WorkerThread extends PropagatingThread { + private final long workerId; + private final long batchSize = 100; + private final AsyncConnection connection; + private final TableName tableName; + private final long entriesPerWorker; + private final long numberOfBatches; + + public WorkerThread( + int workerId, AsyncConnection connection, TableName tableName, long numberOfBatches) { + this.workerId = workerId; + this.connection = connection; + this.entriesPerWorker = numberOfBatches * batchSize; + this.numberOfBatches = numberOfBatches; + this.tableName = tableName; + } + + @Override + public void performTask() throws Throwable { + AsyncTable table = this.connection.getTable(tableName); + for (long batchId = 0; batchId < this.numberOfBatches; batchId++) { + List puts = new ArrayList<>(); + for (long batchEntryId = 0; batchEntryId < this.batchSize; batchEntryId++) { + long putIndex = + this.workerId * this.entriesPerWorker + batchId * this.batchSize + batchEntryId; + long putValue = putIndex + 1; + byte[] putIndexBytes = Longs.toByteArray(putIndex); + byte[] putValueBytes = Longs.toByteArray(putValue); + puts.add( + Helpers.createPut( + putIndexBytes, columnFamily1, qualifier1, putValue, putValueBytes)); + } + CompletableFuture.allOf(table.put(puts).toArray(new CompletableFuture[0])).get(); + } + } + } + + final int numberOfWorkers = 100; + final int numberOfBatches = 100; + + final TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection connection = asyncConnectionRule.createAsyncConnection(config)) { + List workers = new ArrayList<>(); + for (int i = 0; i < numberOfWorkers; i++) { + PropagatingThread worker = new WorkerThread(i, connection, tableName, numberOfBatches); + worker.start(); + workers.add(worker); + } + + for (PropagatingThread worker : workers) { + worker.propagatingJoin(60000); + } + } + + try (MirroringAsyncConnection connection = asyncConnectionRule.createAsyncConnection(config)) { + try (ResultScanner s = connection.getTable(tableName).getScanner(columnFamily1, qualifier1)) { + long counter = 0; + for (Result r : s) { + long row = Longs.fromByteArray(r.getRow()); + long value = Longs.fromByteArray(r.getValue(columnFamily1, qualifier1)); + assertEquals(counter, row); + assertEquals(counter + 1, value); + counter += 1; + } + } + } + + assertEquals( + MismatchDetectorCounter.getInstance().getErrorsAsString(), + 0, + MismatchDetectorCounter.getInstance().getErrorCount()); + } + + @Test + public void conditionalMutationsPreserveConsistency() throws IOException, TimeoutException { + // TODO(mwalkiewicz): fix BigtableToHBase2 + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + final int numberOfOperations = 50; + final int numberOfWorkers = 100; + + final byte[] canary = "canary-value".getBytes(); + + class WorkerThread extends PropagatingThread { + private final long workerId; + private final AsyncConnection connection; + private final TableName tableName; + + public WorkerThread(int workerId, AsyncConnection connection, TableName tableName) { + this.workerId = workerId; + this.connection = connection; + this.tableName = tableName; + } + + @Override + public void performTask() throws Throwable { + AsyncTable table = this.connection.getTable(tableName); + byte[] row = String.format("r%s", workerId).getBytes(); + table.put(Helpers.createPut(row, columnFamily1, qualifier1, 0, "0".getBytes())); + for (int i = 0; i < numberOfOperations; i++) { + byte[] currentValue = String.valueOf(i).getBytes(); + byte[] nextValue = String.valueOf(i + 1).getBytes(); + assertFalse( + table + .checkAndMutate(row, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.NOT_EQUAL, currentValue) + .thenPut(Helpers.createPut(row, columnFamily1, qualifier1, i, canary)) + .get()); + assertTrue( + table + .checkAndMutate(row, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, currentValue) + .thenPut(Helpers.createPut(row, columnFamily1, qualifier1, i, nextValue)) + .get()); + } + } + } + + final TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection connection = asyncConnectionRule.createAsyncConnection(config)) { + List workers = new ArrayList<>(); + for (int i = 0; i < numberOfWorkers; i++) { + PropagatingThread worker = new WorkerThread(i, connection, tableName); + worker.start(); + workers.add(worker); + } + + for (PropagatingThread worker : workers) { + worker.propagatingJoin(30000); + } + } + + try (MirroringConnection connection = databaseHelpers.createConnection()) { + try (Table t = connection.getTable(tableName)) { + try (ResultScanner s = t.getScanner(columnFamily1, qualifier1)) { + int counter = 0; + for (Result r : s) { + assertEquals( + new String(r.getRow(), Charset.defaultCharset()), + String.valueOf(numberOfOperations), + new String(r.getValue(columnFamily1, qualifier1), Charset.defaultCharset())); + counter++; + } + assertEquals(numberOfWorkers, counter); + } + } + } + + assertEquals( + numberOfWorkers + 1, // because null returned from the scanner is also verified. + MismatchDetectorCounter.getInstance().getVerificationsStartedCounter()); + assertEquals( + numberOfWorkers + 1, + MismatchDetectorCounter.getInstance().getVerificationsFinishedCounter()); + assertEquals( + MismatchDetectorCounter.getInstance().getErrorsAsString(), + 0, + MismatchDetectorCounter.getInstance().getErrorCount()); + } +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java new file mode 100644 index 0000000000..4f2b733f97 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/TestMirroringAsyncTable.java @@ -0,0 +1,1363 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.hbase.mirroring; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.bigtable.hbase.mirroring.utils.AsyncConnectionRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.ConfigurationHelper; +import com.google.cloud.bigtable.hbase.mirroring.utils.ConnectionRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.DatabaseHelpers; +import com.google.cloud.bigtable.hbase.mirroring.utils.ExecutorServiceRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.Helpers; +import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounter; +import com.google.cloud.bigtable.hbase.mirroring.utils.MismatchDetectorCounterRule; +import com.google.cloud.bigtable.hbase.mirroring.utils.TestWriteErrorConsumer; +import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegion; +import com.google.cloud.bigtable.hbase.mirroring.utils.failinghbaseminicluster.FailingHBaseHRegionRule; +import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import com.google.common.primitives.Longs; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; +import org.apache.hadoop.hbase.client.AsyncTable; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.junit.Assume; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class TestMirroringAsyncTable { + @ClassRule public static ConnectionRule connectionRule = new ConnectionRule(); + + @ClassRule + public static AsyncConnectionRule asyncConnectionRule = new AsyncConnectionRule(connectionRule); + + @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + + @Rule public FailingHBaseHRegionRule failingHBaseHRegionRule = new FailingHBaseHRegionRule(); + + @Rule + public MismatchDetectorCounterRule mismatchDetectorCounterRule = + new MismatchDetectorCounterRule(); + + final Predicate failPredicate = + (bytes) -> bytes.length == 8 && Longs.fromByteArray(bytes) % 2 == 0; + + public DatabaseHelpers databaseHelpers = new DatabaseHelpers(connectionRule, executorServiceRule); + + public static final Configuration config = ConfigurationHelper.newConfiguration(); + + static final byte[] columnFamily1 = "cf1".getBytes(); + static final byte[] qualifier1 = "cq1".getBytes(); + static final byte[] qualifier2 = "cq2".getBytes(); + static final byte[] qualifier3 = "cq3".getBytes(); + static final byte[] qualifier4 = "cq4".getBytes(); + static final byte[] qualifier5 = "cq5".getBytes(); + + public static byte[] rowKeyFromId(int id) { + return Longs.toByteArray(id); + } + + @Test + public void testPut() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> putFutures = + IntStream.range(0, databaseEntriesCount) + .mapToObj(i -> t.put(Helpers.createPut(i, columnFamily1, qualifier1))) + .collect(Collectors.toList()); + + CompletableFuture.allOf(putFutures.toArray(new CompletableFuture[0])).get(); + } + databaseHelpers.verifyTableConsistency(tableName1); + + final TableName tableName2 = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List>> putBatches = new ArrayList<>(); + int id = 0; + for (int i = 0; i < 10; i++) { + List puts = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + puts.add(Helpers.createPut(id, columnFamily1, qualifier1)); + id++; + } + putBatches.add(t.put(puts)); + } + CompletableFuture.allOf( + putBatches.stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + .toArray(new CompletableFuture[0])) + .get(); + } + databaseHelpers.verifyTableConsistency(tableName2); + } + + @Test + public void testPutWithPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> putFutures = + IntStream.range(0, databaseEntriesCount) + .mapToObj(i -> t.put(Helpers.createPut(i, columnFamily1, qualifier1))) + .collect(Collectors.toList()); + CompletableFuture.allOf(putFutures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < putFutures.size(); i++) { + checkIfShouldHaveThrown(putFutures.get(i), rowKeyFromId(i)); + } + } + databaseHelpers.verifyTableConsistency(tableName1); + + final TableName tableName2 = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List>> putBatches = new ArrayList<>(); + int id = 0; + for (int i = 0; i < 100; i++) { + List puts = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + puts.add(Helpers.createPut(id, columnFamily1, qualifier1)); + id++; + } + putBatches.add(t.put(puts)); + } + List> flatFutures = + putBatches.stream().flatMap(List::stream).collect(Collectors.toList()); + CompletableFuture.allOf(flatFutures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < flatFutures.size(); i++) { + checkIfShouldHaveThrown(flatFutures.get(i), rowKeyFromId(i)); + } + } + databaseHelpers.verifyTableConsistency(tableName2); + } + + @Test + public void testPutWithSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> putFutures = + IntStream.range(0, databaseEntriesCount) + .mapToObj(i -> t.put(Helpers.createPut(i, columnFamily1, qualifier1))) + .collect(Collectors.toList()); + + CompletableFuture.allOf(putFutures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + } + databaseHelpers.verifyTableConsistency(tableName1, failPredicate); + + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext2 = + new TestMirroringTable.ReportedErrorsContext(); + final TableName tableName2 = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List>> putBatches = new ArrayList<>(); + int id = 0; + for (int i = 0; i < 10; i++) { + List puts = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + puts.add(Helpers.createPut(id, columnFamily1, qualifier1)); + id++; + } + putBatches.add(t.put(puts)); + } + CompletableFuture.allOf( + putBatches.stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + .toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + } + + databaseHelpers.verifyTableConsistency(tableName2, failPredicate); + reportedErrorsContext2.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testDelete() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> deleteFutures = + IntStream.range(0, databaseEntriesCount) + .mapToObj( + i -> t.delete(Helpers.createDelete(rowKeyFromId(i), columnFamily1, qualifier1))) + .collect(Collectors.toList()); + + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])).get(); + } + databaseHelpers.verifyTableConsistency(tableName1); + + final TableName tableName2 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName2, databaseEntriesCount, columnFamily1, qualifier1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List>> deleteBatches = new ArrayList<>(); + int id = 0; + for (int i = 0; i < 10; i++) { + List deletes = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + deletes.add(Helpers.createDelete(rowKeyFromId(id), columnFamily1, qualifier1)); + id++; + } + deleteBatches.add(t.delete(deletes)); + } + CompletableFuture.allOf( + deleteBatches.stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + .toArray(new CompletableFuture[0])) + .get(); + } + databaseHelpers.verifyTableConsistency(tableName2); + } + + @Test + public void testDeleteWithPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + // Fill tables before forcing operations to fail. + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + final TableName tableName2 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName2, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> deleteFutures = + IntStream.range(0, databaseEntriesCount) + .mapToObj( + i -> t.delete(Helpers.createDelete(rowKeyFromId(i), columnFamily1, qualifier1))) + .collect(Collectors.toList()); + + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < deleteFutures.size(); i++) { + checkIfShouldHaveThrown(deleteFutures.get(i), rowKeyFromId(i)); + } + } + databaseHelpers.verifyTableConsistency(tableName1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List>> deleteBatches = new ArrayList<>(); + int id = 0; + for (int i = 0; i < databaseEntriesCount / 100; i++) { + List deletes = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + deletes.add(Helpers.createDelete(rowKeyFromId(id), columnFamily1, qualifier1)); + id++; + } + deleteBatches.add(t.delete(deletes)); + } + List> flatFutures = + deleteBatches.stream().flatMap(List::stream).collect(Collectors.toList()); + CompletableFuture.allOf(flatFutures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < flatFutures.size(); i++) { + checkIfShouldHaveThrown(flatFutures.get(i), rowKeyFromId(i)); + } + + assertThat(flatFutures.stream().filter(CompletableFuture::isCompletedExceptionally).count()) + .isEqualTo(flatFutures.stream().filter(f -> !f.isCompletedExceptionally()).count()); + } + databaseHelpers.verifyTableConsistency(tableName2); + } + + @Test + public void testDeleteWithSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + // Fill tables before forcing operations to fail. + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + final TableName tableName2 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName2, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.BAD_FAMILY, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> deleteFutures = + IntStream.range(0, databaseEntriesCount) + .mapToObj( + i -> t.delete(Helpers.createDelete(rowKeyFromId(i), columnFamily1, qualifier1))) + .collect(Collectors.toList()); + + CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])).get(); + } + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(0); + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.SECONDARY)) + .isEqualTo(databaseEntriesCount / 2); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext2 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName2); + + List>> deleteBatches = new ArrayList<>(); + int id = 0; + for (int i = 0; i < 10; i++) { + List deletes = new ArrayList<>(); + for (int j = 0; j < 100; j++) { + deletes.add(Helpers.createDelete(rowKeyFromId(id), columnFamily1, qualifier1)); + id++; + } + deleteBatches.add(t.delete(deletes)); + } + CompletableFuture.allOf( + deleteBatches.stream() + .flatMap(List::stream) + .collect(Collectors.toList()) + .toArray(new CompletableFuture[0])) + .get(); + } + assertThat(databaseHelpers.countRows(tableName2, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(0); + assertThat(databaseHelpers.countRows(tableName2, DatabaseHelpers.DatabaseSelector.SECONDARY)) + .isEqualTo(databaseEntriesCount / 2); + reportedErrorsContext2.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testCheckAndPut() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifEquals(Longs.toByteArray(i)) + .thenPut(Helpers.createPut(i, columnFamily1, qualifier2))); + futures.add( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, Longs.toByteArray(i)) + .thenPut(Helpers.createPut(i, columnFamily1, qualifier3))); + futures.add( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.GREATER, Longs.toByteArray(i + 1)) + .thenPut(Helpers.createPut(i, columnFamily1, qualifier4))); + futures.add( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.NOT_EQUAL, Longs.toByteArray(i)) + .thenPut(Helpers.createPut(i, columnFamily1, qualifier5))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount); + assertThat(databaseHelpers.countCells(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount * 4); + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testCheckAndPutPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < databaseEntriesCount; i++) { + final byte[] rowKeyAndValue = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKeyAndValue, columnFamily1) + .qualifier(qualifier1) + .ifEquals(rowKeyAndValue) + .thenPut(Helpers.createPut(i, columnFamily1, qualifier2))); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount); + assertThat(databaseHelpers.countCells(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo((int) (databaseEntriesCount * 1.5)); + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testCheckAndPutSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKeyAndValue = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKeyAndValue, columnFamily1) + .qualifier(qualifier1) + .ifEquals(rowKeyAndValue) + .thenPut(Helpers.createPut(i, columnFamily1, qualifier2))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + + for (CompletableFuture fut : futures) { + assertThat(fut.getNow(false)).isEqualTo(true); + } + } + + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.PRIMARY, + Helpers.createScan(columnFamily1, qualifier2))) + .isEqualTo(databaseEntriesCount); + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.SECONDARY, + Helpers.createScan(columnFamily1, qualifier2))) + .isEqualTo(databaseEntriesCount / 2); + assertThat(databaseHelpers.countCells(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount * 2); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testCheckAndDelete() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable( + tableName1, + databaseEntriesCount, + columnFamily1, + qualifier1, + qualifier2, + qualifier3, + qualifier4, + qualifier5); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + assertThat( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifEquals(Longs.toByteArray(i)) + .thenDelete( + Helpers.createDelete(Longs.toByteArray(i), columnFamily1, qualifier2)) + .get()) + .isTrue(); + assertThat( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, Longs.toByteArray(i)) + .thenDelete( + Helpers.createDelete(Longs.toByteArray(i), columnFamily1, qualifier3)) + .get()) + .isTrue(); + assertThat( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.GREATER, Longs.toByteArray(i + 1)) + .thenDelete( + Helpers.createDelete(Longs.toByteArray(i), columnFamily1, qualifier4)) + .get()) + .isTrue(); + assertThat( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.NOT_EQUAL, Longs.toByteArray(i)) + .thenDelete( + Helpers.createDelete(Longs.toByteArray(i), columnFamily1, qualifier5)) + .get()) + .isFalse(); + } + } + + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount); + + assertThat(databaseHelpers.countCells(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount * 2); + + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testCheckAndDeletePrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable( + tableName1, databaseEntriesCount, columnFamily1, qualifier1, qualifier2); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + final byte[] rowKeyAndValue = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKeyAndValue, columnFamily1) + .qualifier(qualifier1) + .ifEquals(rowKeyAndValue) + .thenDelete(Helpers.createDelete(rowKeyAndValue, columnFamily1, qualifier2))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount); + + assertThat(databaseHelpers.countCells(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo((int) (databaseEntriesCount * 1.5)); + + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testCheckAndDeleteSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKeyAndValue = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKeyAndValue, columnFamily1) + .qualifier(qualifier1) + .ifEquals(rowKeyAndValue) + .thenDelete(Helpers.createDelete(rowKeyAndValue, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + for (CompletableFuture fut : futures) { + assertThat(fut.getNow(false)).isEqualTo(true); + } + } + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.PRIMARY, + Helpers.createScan(columnFamily1, qualifier1))) + .isEqualTo(0); + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.SECONDARY, + Helpers.createScan(columnFamily1, qualifier1))) + .isEqualTo(databaseEntriesCount / 2); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testCheckAndMutate() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + assertThat( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, Longs.toByteArray(i)) + .thenMutate( + Helpers.createRowMutations( + rowKey, + Helpers.createPut(i, columnFamily1, qualifier2), + Helpers.createDelete(rowKey, columnFamily1, qualifier1))) + .get()) + .isTrue(); + assertThat( + t.checkAndMutate(rowKey, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, Longs.toByteArray(i)) + .thenMutate( + Helpers.createRowMutations( + rowKey, Helpers.createDelete(rowKey, columnFamily1, qualifier2))) + .get()) + .isFalse(); + } + } + + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.PRIMARY, + Helpers.createScan(columnFamily1, qualifier1))) + .isEqualTo(0); + + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.PRIMARY, + Helpers.createScan(columnFamily1, qualifier2))) + .isEqualTo(databaseEntriesCount); + + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testCheckAndMutatePrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable( + tableName1, databaseEntriesCount, columnFamily1, qualifier1, qualifier2); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + final byte[] rowKeyAndValue = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKeyAndValue, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, rowKeyAndValue) + .thenMutate( + Helpers.createRowMutations( + rowKeyAndValue, + Helpers.createDelete(rowKeyAndValue, columnFamily1, qualifier2)))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + + assertThat(databaseHelpers.countRows(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo(databaseEntriesCount); + + assertThat(databaseHelpers.countCells(tableName1, DatabaseHelpers.DatabaseSelector.PRIMARY)) + .isEqualTo((int) (databaseEntriesCount * 1.5)); + + databaseHelpers.verifyTableConsistency(tableName1); + } + + // TODO(mwalkiewicz): fix + @Ignore("Fails for unknown reasons") + @Test + public void testCheckAndMutateSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKeyAndValue = rowKeyFromId(i); + futures.add( + t.checkAndMutate(rowKeyAndValue, columnFamily1) + .qualifier(qualifier1) + .ifMatches(CompareOperator.EQUAL, rowKeyAndValue) + .thenMutate( + Helpers.createRowMutations( + rowKeyAndValue, + Helpers.createDelete(rowKeyAndValue, columnFamily1, qualifier1)))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + + for (CompletableFuture fut : futures) { + assertThat(fut.getNow(false)).isEqualTo(true); + } + } + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.PRIMARY, + Helpers.createScan(columnFamily1, qualifier1))) + .isEqualTo(0); + assertThat( + databaseHelpers.countRows( + tableName1, + DatabaseHelpers.DatabaseSelector.SECONDARY, + Helpers.createScan(columnFamily1, qualifier1))) + .isEqualTo(databaseEntriesCount / 2); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testIncrement() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.increment(Helpers.createIncrement(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + databaseHelpers.verifyTableConsistency(tableName1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + Result r = t.get(Helpers.createGet(rowKey, columnFamily1, qualifier1)).get(); + assertThat(Longs.fromByteArray(r.getValue(columnFamily1, qualifier1))).isEqualTo(i + 1); + } + } + } + + @Test + public void testIncrementPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.increment(Helpers.createIncrement(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testIncrementSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + TestWriteErrorConsumer.clearErrors(); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.increment(Helpers.createIncrement(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + assertThat(TestWriteErrorConsumer.getErrorCount()).isEqualTo(databaseEntriesCount / 2); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testAppend() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add( + t.append(Helpers.createAppend(rowKey, columnFamily1, qualifier1, new byte[] {1}))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + databaseHelpers.verifyTableConsistency(tableName1); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + Result r = t.get(Helpers.createGet(rowKey, columnFamily1, qualifier1)).get(); + byte[] expectedValue = new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 1}; + System.arraycopy(rowKey, 0, expectedValue, 0, 8); + assertThat(r.getValue(columnFamily1, qualifier1)).isEqualTo(expectedValue); + } + } + } + + @Test + public void testAppendPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add( + t.append(Helpers.createAppend(rowKey, columnFamily1, qualifier1, new byte[] {1}))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + + databaseHelpers.verifyTableConsistency(tableName1); + } + + @Test + public void testAppendSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + TestWriteErrorConsumer.clearErrors(); + + FailingHBaseHRegion.failMutation( + failPredicate, HConstants.OperationStatusCode.SANITY_CHECK_FAILURE, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add( + t.append(Helpers.createAppend(rowKey, columnFamily1, qualifier1, new byte[] {1}))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + + assertThat(TestWriteErrorConsumer.getErrorCount()).isEqualTo(databaseEntriesCount / 2); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testGet() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.get(Helpers.createGet(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + assertThat(MismatchDetectorCounter.getInstance().getErrorCount()).isEqualTo(0); + } + + @Test + public void testGetWithPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.get(Helpers.createGet(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + assertThat(MismatchDetectorCounter.getInstance().getErrorCount()).isEqualTo(0); + } + + @Test + public void testGetWithSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.get(Helpers.createGet(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } + assertThat(MismatchDetectorCounter.getInstance().getErrorCount()) + .isEqualTo(databaseEntriesCount / 2); + assertThat(MismatchDetectorCounter.getInstance().getErrorCount("failure")) + .isEqualTo(databaseEntriesCount / 2); + assertThat(MismatchDetectorCounter.getInstance().getErrorCount("mismatch")).isEqualTo(0); + } + + @Test + public void testExists() throws IOException, ExecutionException, InterruptedException { + int databaseEntriesCount = 1000; + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.exists(Helpers.createGet(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + assertThat(futures.stream().allMatch(fut -> fut.getNow(false))).isTrue(); + } + assertThat(MismatchDetectorCounter.getInstance().getErrorCount()).isEqualTo(0); + } + + @Test + public void testExistsWithPrimaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.exists(Helpers.createGet(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < futures.size(); i++) { + checkIfShouldHaveThrown(futures.get(i), rowKeyFromId(i)); + } + } + assertThat(MismatchDetectorCounter.getInstance().getErrorCount()).isEqualTo(0); + } + + @Test + public void testExistsWithSecondaryErrors() + throws IOException, ExecutionException, InterruptedException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + final TableName tableName1 = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName1, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName1); + List> futures = new ArrayList<>(); + for (int i = 0; i < databaseEntriesCount; i++) { + byte[] rowKey = rowKeyFromId(i); + futures.add(t.exists(Helpers.createGet(rowKey, columnFamily1, qualifier1))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + assertThat(futures.stream().allMatch(fut -> fut.getNow(false))).isTrue(); + } + assertThat(MismatchDetectorCounter.getInstance().getErrorCount()) + .isEqualTo(databaseEntriesCount / 2); + assertThat(MismatchDetectorCounter.getInstance().getErrorCount("failure")) + .isEqualTo(databaseEntriesCount / 2); + assertThat(MismatchDetectorCounter.getInstance().getErrorCount("mismatch")).isEqualTo(0); + } + + @Test + public void testBatch() throws IOException, InterruptedException, ExecutionException { + int databaseEntriesCount = 1000; + + final TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName); + + List>> batches = new ArrayList<>(); + int id = 0; + while (id < databaseEntriesCount) { + List batch = new ArrayList<>(); + for (int j = 0; j < 100 && id < databaseEntriesCount; j++) { + batch.add(Helpers.createPut(id, columnFamily1, qualifier1)); + id++; + } + batches.add(t.batch(batch)); + } + List> flatResults = + batches.stream().flatMap(List::stream).collect(Collectors.toList()); + CompletableFuture.allOf(flatResults.toArray(new CompletableFuture[0])).get(); + } + databaseHelpers.verifyTableConsistency(tableName); + } + + @Test + public void testBatchWithPrimaryErrors() + throws IOException, InterruptedException, ExecutionException { + Assume.assumeTrue( + ConfigurationHelper.isPrimaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + final TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName); + + List>> batches = new ArrayList<>(); + int id = 0; + while (id < databaseEntriesCount) { + List batch = new ArrayList<>(); + for (int j = 0; j < 100 && id < databaseEntriesCount; j++) { + batch.add(Helpers.createPut(id, columnFamily1, qualifier1)); + id++; + } + batches.add(t.batch(batch)); + } + List> flatResults = + batches.stream().flatMap(List::stream).collect(Collectors.toList()); + CompletableFuture.allOf(flatResults.toArray(new CompletableFuture[0])) + .exceptionally(e -> null) + .get(); + + for (int i = 0; i < flatResults.size(); i++) { + checkIfShouldHaveThrown(flatResults.get(i), rowKeyFromId(i)); + } + } + databaseHelpers.verifyTableConsistency(tableName); + } + + @Test + public void testBatchWithSecondaryErrors() + throws IOException, InterruptedException, ExecutionException { + Assume.assumeTrue( + ConfigurationHelper.isSecondaryHBase() && ConfigurationHelper.isUsingHBaseMiniCluster()); + + int databaseEntriesCount = 1000; + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + TestMirroringTable.ReportedErrorsContext reportedErrorsContext1 = + new TestMirroringTable.ReportedErrorsContext(); + final TableName tableName = connectionRule.createTable(columnFamily1); + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable t = asyncConnection.getTable(tableName); + + List>> batches = new ArrayList<>(); + int id = 0; + while (id < databaseEntriesCount) { + List batch = new ArrayList<>(); + for (int j = 0; j < 100 && id < databaseEntriesCount; j++) { + batch.add(Helpers.createPut(id, columnFamily1, qualifier1)); + id++; + } + batches.add(t.batch(batch)); + } + List> flatResults = + batches.stream().flatMap(List::stream).collect(Collectors.toList()); + CompletableFuture.allOf(flatResults.toArray(new CompletableFuture[0])).get(); + } + databaseHelpers.verifyTableConsistency(tableName, failPredicate); + reportedErrorsContext1.assertNewErrorsReported(databaseEntriesCount / 2); + } + + @Test + public void testResultScanner() throws IOException { + int databaseEntriesCount = 1000; + + TableName tableName = connectionRule.createTable(columnFamily1); + databaseHelpers.fillTable(tableName, databaseEntriesCount, columnFamily1, qualifier1); + + FailingHBaseHRegion.failMutation(failPredicate, "failed"); + + try (MirroringAsyncConnection asyncConnection = + asyncConnectionRule.createAsyncConnection(config)) { + AsyncTable table = asyncConnection.getTable(tableName); + + try (ResultScanner scanner = table.getScanner(columnFamily1)) { + assertThat(Iterators.size(scanner.iterator())).isEqualTo(databaseEntriesCount); + } + } + } + + private void checkIfShouldHaveThrown(CompletableFuture future, byte[] rowKey) { + assertThat(failPredicate.apply(rowKey)).isEqualTo(future.isCompletedExceptionally()); + } +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/AsyncConnectionRule.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/AsyncConnectionRule.java new file mode 100644 index 0000000000..151dbb31ac --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/java/com/google/cloud/bigtable/hbase/mirroring/utils/AsyncConnectionRule.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.hbase.mirroring.utils; + +import com.google.cloud.bigtable.mirroring.hbase2_x.MirroringAsyncConnection; +import java.util.concurrent.ExecutionException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.junit.rules.ExternalResource; + +public class AsyncConnectionRule extends ExternalResource { + private final ConnectionRule connectionRule; + + public AsyncConnectionRule(ConnectionRule connectionRule) { + this.connectionRule = connectionRule; + } + + public MirroringAsyncConnection createAsyncConnection(Configuration configuration) { + this.connectionRule.updateConfigurationWithHbaseMiniClusterProps(configuration); + + try { + AsyncConnection conn = ConnectionFactory.createAsyncConnection(configuration).get(); + return (MirroringAsyncConnection) conn; + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/bigtable-to-hbase-local-configuration.xml b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/bigtable-to-hbase-local-configuration.xml new file mode 100644 index 0000000000..9e4fd25c80 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/bigtable-to-hbase-local-configuration.xml @@ -0,0 +1,66 @@ + + + hbase.client.connection.impl + com.google.cloud.bigtable.mirroring.hbase2_x.MirroringConnection + + + + google.bigtable.mirroring.primary-client.connection.impl + com.google.cloud.bigtable.hbase2_x.BigtableConnection + + + + google.bigtable.mirroring.primary-client.async.connection.impl + org.apache.hadoop.hbase.client.BigtableAsyncConnection + + + + google.bigtable.mirroring.secondary-client.connection.impl + default + + + + google.bigtable.mirroring.secondary-client.async.connection.impl + default + + + + google.bigtable.project.id + fake-project + + + + google.bigtable.instance.id + fake-instance + + + + google.bigtable.use.gcj.client + false + + + + hbase.client.retries.number + 2 + + + + use-hbase-mini-cluster + true + + + + google.bigtable.mirroring.write-error-log.appender.prefix-path + /tmp/write-error-log + + + + google.bigtable.mirroring.write-error-log.appender.max-buffer-size + 8388608 + + + + google.bigtable.mirroring.write-error-log.appender.drop-on-overflow + false + + diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/hbase-to-bigtable-local-configuration.xml b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/hbase-to-bigtable-local-configuration.xml new file mode 100644 index 0000000000..ee08d55bfb --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/hbase-to-bigtable-local-configuration.xml @@ -0,0 +1,66 @@ + + + hbase.client.connection.impl + com.google.cloud.bigtable.mirroring.hbase2_x.MirroringConnection + + + + google.bigtable.mirroring.primary-client.connection.impl + default + + + + google.bigtable.mirroring.primary-client.async.connection.impl + default + + + + google.bigtable.mirroring.secondary-client.connection.impl + com.google.cloud.bigtable.hbase2_x.BigtableConnection + + + + google.bigtable.mirroring.secondary-client.async.connection.impl + org.apache.hadoop.hbase.client.BigtableAsyncConnection + + + + google.bigtable.project.id + fake-project + + + + google.bigtable.instance.id + fake-instance + + + + google.bigtable.use.gcj.client + false + + + + hbase.client.retries.number + 2 + + + + use-hbase-mini-cluster + true + + + + google.bigtable.mirroring.write-error-log.appender.prefix-path + /tmp/write-error-log + + + + google.bigtable.mirroring.write-error-log.appender.max-buffer-size + 8388608 + + + + google.bigtable.mirroring.write-error-log.appender.drop-on-overflow + false + + diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/log4j.properties b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/log4j.properties new file mode 100644 index 0000000000..f0dbf50014 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/log4j.properties @@ -0,0 +1,7 @@ +log4j.rootLogger=WARN, console + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.Target=System.out +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=[%-20t] %-5p %-20c{1} - %m%n +log4j.logger.com.google.cloud.bigtable.mirroring=OFF diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/prometheus.yml b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/prometheus.yml new file mode 100644 index 0000000000..63c3fc1a27 --- /dev/null +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x-integration-tests/src/test/resources/prometheus.yml @@ -0,0 +1,13 @@ +global: + scrape_interval: 5s + + external_labels: + monitor: 'bigtable-mirroring-client-integration-tests' + +scrape_configs: + - job_name: 'bigtable-mirroring-client-integration-tests' + + scrape_interval: 5s + + static_configs: + - targets: ['localhost:8888'] diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java index 9eab1fa696..00076bab95 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/main/java/com/google/cloud/bigtable/mirroring/hbase2_x/MirroringAsyncConnection.java @@ -107,7 +107,7 @@ public MirroringAsyncConnection( ReflectionConstructor.construct( this.configuration.mirroringOptions.writeErrorLogAppenderClass, Configuration.class, - this.configuration), + this.configuration.baseConfiguration), ReflectionConstructor.construct( this.configuration.mirroringOptions.writeErrorLogSerializerClass)); @@ -122,6 +122,14 @@ public MirroringAsyncConnection( this.executorService = Executors.newCachedThreadPool(); } + public AsyncConnection getPrimaryConnection() { + return this.primaryConnection; + } + + public AsyncConnection getSecondaryConnection() { + return this.secondaryConnection; + } + @Override public Configuration getConfiguration() { return this.configuration.baseConfiguration; diff --git a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java index 9c399e7e2d..5c6b147d52 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java +++ b/bigtable-hbase-mirroring-client-2.x-parent/bigtable-hbase-mirroring-client-2.x/src/test/java/com/google/cloud/bigtable/mirroring/hbase2_x/TestMirroringAsyncTable.java @@ -712,6 +712,39 @@ public void testConditionalWriteWhenPrimaryErred() verify(secondaryTable, never()).put(any(Put.class)); } + @Test + public void testConditionalWriteHappensWhenSecondaryErred() + throws ExecutionException, InterruptedException, IOException { + byte[] row = "r1".getBytes(); + Put put = new Put(row); + RowMutations mutations = new RowMutations(row); + mutations.add(put); + CompletableFuture primaryFuture = new CompletableFuture<>(); + when(primaryBuilder.thenMutate(mutations)).thenReturn(primaryFuture); + + IOException ioe = new IOException("expected"); + CompletableFuture exceptionalFuture = new CompletableFuture<>(); + exceptionalFuture.completeExceptionally(ioe); + when(secondaryTable.mutateRow(mutations)).thenReturn(exceptionalFuture); + + verify(referenceCounter, never()).incrementReferenceCount(); + verify(referenceCounter, never()).decrementReferenceCount(); + CompletableFuture resultFuture = + mirroringTable.checkAndMutate("r1".getBytes(), "f1".getBytes()).thenMutate(mutations); + + verify(referenceCounter, times(1)).incrementReferenceCount(); + primaryFuture.complete(true); + // The reference count is incremented once at the beginning of checkAndMutate() and then for the + // second time in writeWithControlFlow(). + // It's done this way so that the reference counting invariant isn't violated when refactoring + // brittle code around forwarding result of writeWithFlowControl(). + resultFuture.get(); + verify(secondaryTable, times(1)).mutateRow(mutations); + + verify(referenceCounter, times(2)).incrementReferenceCount(); + verify(referenceCounter, times(2)).decrementReferenceCount(); + } + @Test public void testCheckAndMutateBuilderChainingWhenInPlace() { byte[] qual = "q1".getBytes(); diff --git a/bigtable-hbase-mirroring-client-2.x-parent/pom.xml b/bigtable-hbase-mirroring-client-2.x-parent/pom.xml index 7bc240c613..dc7f2d2e3e 100644 --- a/bigtable-hbase-mirroring-client-2.x-parent/pom.xml +++ b/bigtable-hbase-mirroring-client-2.x-parent/pom.xml @@ -45,5 +45,6 @@ limitations under the License. bigtable-hbase-mirroring-client-2.x bigtable-hbase-mirroring-client-1.x-2.x-integration-tests + bigtable-hbase-mirroring-client-2.x-integration-tests