diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 89fca4fad4d3..b34e2a1daacc 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -92,9 +92,7 @@ public SyncTable(Configuration conf) { private void initCredentialsForHBase(String zookeeper, Job job) throws IOException { Configuration peerConf = HBaseConfiguration.createClusterConf(job.getConfiguration(), zookeeper); - if ("kerberos".equalsIgnoreCase(peerConf.get("hbase.security.authentication"))) { - TableMapReduceUtil.initCredentialsForCluster(job, peerConf); - } + TableMapReduceUtil.initCredentialsForCluster(job, peerConf); } public Job createSubmittableJob(String[] args) throws IOException { @@ -172,12 +170,6 @@ public Job createSubmittableJob(String[] args) throws IOException { // would be nice to add an option for bulk load instead } - // Obtain an authentication token, for the specified cluster, on behalf of the current user - if (sourceZkCluster != null) { - Configuration peerConf = - HBaseConfiguration.createClusterConf(job.getConfiguration(), sourceZkCluster); - TableMapReduceUtil.initCredentialsForCluster(job, peerConf); - } return job; } @@ -220,7 +212,6 @@ public static enum Counter { @Override protected void setup(Context context) throws IOException { - Configuration conf = context.getConfiguration(); sourceHashDir = new Path(conf.get(SOURCE_HASH_DIR_CONF_KEY)); sourceConnection = openConnection(conf, SOURCE_ZK_CLUSTER_CONF_KEY, null); @@ -292,9 +283,7 @@ protected void map(ImmutableBytesWritable key, Result value, Context context) } } catch (Throwable t) { mapperException = t; - Throwables.propagateIfInstanceOf(t, IOException.class); - Throwables.propagateIfInstanceOf(t, InterruptedException.class); - Throwables.propagate(t); + throw t; } } @@ -693,9 +682,9 @@ protected void cleanup(Context context) throws IOException, InterruptedException // propagate first exception if (mapperException != null) { - Throwables.propagateIfInstanceOf(mapperException, IOException.class); - Throwables.propagateIfInstanceOf(mapperException, InterruptedException.class); - Throwables.propagate(mapperException); + Throwables.throwIfInstanceOf(mapperException, IOException.class); + Throwables.throwIfInstanceOf(mapperException, InterruptedException.class); + Throwables.throwIfUnchecked(mapperException); } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableTestBase.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableTestBase.java new file mode 100644 index 000000000000..d7648c26406d --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/CopyTableTestBase.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.mob.MobTestUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Rule; +import org.junit.rules.TestName; + +/** + * Base class for testing CopyTable MR tool. + */ +public abstract class CopyTableTestBase { + + protected static final byte[] ROW1 = Bytes.toBytes("row1"); + protected static final byte[] ROW2 = Bytes.toBytes("row2"); + protected static final String FAMILY_A_STRING = "a"; + protected static final String FAMILY_B_STRING = "b"; + protected static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); + protected static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING); + protected static final byte[] QUALIFIER = Bytes.toBytes("q"); + + @Rule + public TestName name = new TestName(); + + protected abstract Table createSourceTable(TableDescriptor desc) throws Exception; + + protected abstract Table createTargetTable(TableDescriptor desc) throws Exception; + + protected abstract void dropSourceTable(TableName tableName) throws Exception; + + protected abstract void dropTargetTable(TableName tableName) throws Exception; + + protected abstract String[] getPeerClusterOptions() throws Exception; + + protected final void loadData(Table t, byte[] family, byte[] column) throws IOException { + for (int i = 0; i < 10; i++) { + byte[] row = Bytes.toBytes("row" + i); + Put p = new Put(row); + p.addColumn(family, column, row); + t.put(p); + } + } + + protected final void verifyRows(Table t, byte[] family, byte[] column) throws IOException { + for (int i = 0; i < 10; i++) { + byte[] row = Bytes.toBytes("row" + i); + Get g = new Get(row).addFamily(family); + Result r = t.get(g); + assertNotNull(r); + assertEquals(1, r.size()); + Cell cell = r.rawCells()[0]; + assertTrue(CellUtil.matchingQualifier(cell, column)); + assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), + cell.getValueLength(), row, 0, row.length), 0); + } + } + + protected final void doCopyTableTest(Configuration conf, boolean bulkload) throws Exception { + TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + byte[] family = Bytes.toBytes("family"); + byte[] column = Bytes.toBytes("c1"); + TableDescriptor desc1 = TableDescriptorBuilder.newBuilder(tableName1) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + TableDescriptor desc2 = TableDescriptorBuilder.newBuilder(tableName2) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + + try (Table t1 = createSourceTable(desc1); Table t2 = createTargetTable(desc2)) { + // put rows into the first table + loadData(t1, family, column); + + String[] peerClusterOptions = getPeerClusterOptions(); + if (bulkload) { + assertTrue(runCopy(conf, + ArrayUtils.addAll(peerClusterOptions, "--new.name=" + tableName2.getNameAsString(), + "--bulkload", tableName1.getNameAsString()))); + } else { + assertTrue(runCopy(conf, ArrayUtils.addAll(peerClusterOptions, + "--new.name=" + tableName2.getNameAsString(), tableName1.getNameAsString()))); + } + + // verify the data was copied into table 2 + verifyRows(t2, family, column); + } finally { + dropSourceTable(tableName1); + dropTargetTable(tableName2); + } + } + + protected final void doCopyTableTestWithMob(Configuration conf, boolean bulkload) + throws Exception { + TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + byte[] family = Bytes.toBytes("mob"); + byte[] column = Bytes.toBytes("c1"); + + ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(family); + + cfd.setMobEnabled(true); + cfd.setMobThreshold(5); + TableDescriptor desc1 = + TableDescriptorBuilder.newBuilder(tableName1).setColumnFamily(cfd.build()).build(); + TableDescriptor desc2 = + TableDescriptorBuilder.newBuilder(tableName2).setColumnFamily(cfd.build()).build(); + + try (Table t1 = createSourceTable(desc1); Table t2 = createTargetTable(desc2)) { + // put rows into the first table + for (int i = 0; i < 10; i++) { + Put p = new Put(Bytes.toBytes("row" + i)); + p.addColumn(family, column, column); + t1.put(p); + } + + String[] peerClusterOptions = getPeerClusterOptions(); + if (bulkload) { + assertTrue(runCopy(conf, + ArrayUtils.addAll(peerClusterOptions, "--new.name=" + tableName2.getNameAsString(), + "--bulkload", tableName1.getNameAsString()))); + } else { + assertTrue(runCopy(conf, ArrayUtils.addAll(peerClusterOptions, + "--new.name=" + tableName2.getNameAsString(), tableName1.getNameAsString()))); + } + + // verify the data was copied into table 2 + for (int i = 0; i < 10; i++) { + Get g = new Get(Bytes.toBytes("row" + i)); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], column)); + assertEquals("compare row values between two tables", + t1.getDescriptor().getValue("row" + i), t2.getDescriptor().getValue("row" + i)); + } + + assertEquals("compare count of mob rows after table copy", MobTestUtil.countMobRows(t1), + MobTestUtil.countMobRows(t2)); + assertEquals("compare count of mob row values between two tables", + t1.getDescriptor().getValues().size(), t2.getDescriptor().getValues().size()); + assertTrue("The mob row count is 0 but should be > 0", MobTestUtil.countMobRows(t2) > 0); + } finally { + dropSourceTable(tableName1); + dropTargetTable(tableName2); + } + } + + protected final boolean runCopy(Configuration conf, String[] args) throws Exception { + int status = ToolRunner.run(conf, new CopyTable(), args); + return status == 0; + } + + protected final void testStartStopRow(Configuration conf) throws Exception { + final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + final byte[] family = Bytes.toBytes("family"); + final byte[] column = Bytes.toBytes("c1"); + final byte[] row0 = Bytes.toBytesBinary("\\x01row0"); + final byte[] row1 = Bytes.toBytesBinary("\\x01row1"); + final byte[] row2 = Bytes.toBytesBinary("\\x01row2"); + TableDescriptor desc1 = TableDescriptorBuilder.newBuilder(tableName1) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + TableDescriptor desc2 = TableDescriptorBuilder.newBuilder(tableName2) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build(); + try (Table t1 = createSourceTable(desc1); Table t2 = createTargetTable(desc2)) { + // put rows into the first table + Put p = new Put(row0); + p.addColumn(family, column, column); + t1.put(p); + p = new Put(row1); + p.addColumn(family, column, column); + t1.put(p); + p = new Put(row2); + p.addColumn(family, column, column); + t1.put(p); + + String[] peerClusterOptions = getPeerClusterOptions(); + assertTrue(runCopy(conf, ArrayUtils.addAll(peerClusterOptions, "--new.name=" + tableName2, + "--startrow=\\x01row1", "--stoprow=\\x01row2", tableName1.getNameAsString()))); + + // verify the data was copied into table 2 + // row1 exist, row0, row2 do not exist + Get g = new Get(row1); + Result r = t2.get(g); + assertEquals(1, r.size()); + assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], column)); + + g = new Get(row0); + r = t2.get(g); + assertEquals(0, r.size()); + + g = new Get(row2); + r = t2.get(g); + assertEquals(0, r.size()); + } finally { + dropSourceTable(tableName1); + dropTargetTable(tableName2); + } + } + + protected final void testRenameFamily(Configuration conf) throws Exception { + TableName sourceTable = TableName.valueOf(name.getMethodName() + "-source"); + TableName targetTable = TableName.valueOf(name.getMethodName() + "-target"); + + TableDescriptor desc1 = TableDescriptorBuilder.newBuilder(sourceTable) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_A)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_B)).build(); + TableDescriptor desc2 = TableDescriptorBuilder.newBuilder(targetTable) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_A)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_B)).build(); + + try (Table t = createSourceTable(desc1); Table t2 = createTargetTable(desc2)) { + Put p = new Put(ROW1); + p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11")); + p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12")); + p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13")); + t.put(p); + p = new Put(ROW2); + p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21")); + p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22")); + p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23")); + t.put(p); + + long currentTime = EnvironmentEdgeManager.currentTime(); + String[] args = ArrayUtils.addAll(getPeerClusterOptions(), "--new.name=" + targetTable, + "--families=a:b", "--all.cells", "--starttime=" + (currentTime - 100000), + "--endtime=" + (currentTime + 100000), "--versions=1", sourceTable.getNameAsString()); + assertNull(t2.get(new Get(ROW1)).getRow()); + + assertTrue(runCopy(conf, args)); + + assertNotNull(t2.get(new Get(ROW1)).getRow()); + Result res = t2.get(new Get(ROW1)); + byte[] b1 = res.getValue(FAMILY_B, QUALIFIER); + assertEquals("Data13", Bytes.toString(b1)); + assertNotNull(t2.get(new Get(ROW2)).getRow()); + res = t2.get(new Get(ROW2)); + b1 = res.getValue(FAMILY_A, QUALIFIER); + // Data from the family of B is not copied + assertNull(b1); + } finally { + dropSourceTable(sourceTable); + dropTargetTable(targetTable); + } + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java index 8a811e6d654e..da420cfe7a7e 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTable.java @@ -18,37 +18,26 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.mob.MobTestUtil; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.LauncherSecurityManager; -import org.apache.hadoop.util.ToolRunner; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -60,20 +49,13 @@ * Basic test for the CopyTable M/R tool */ @Category({ MapReduceTests.class, LargeTests.class }) -public class TestCopyTable { +public class TestCopyTable extends CopyTableTestBase { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestCopyTable.class); private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] ROW1 = Bytes.toBytes("row1"); - private static final byte[] ROW2 = Bytes.toBytes("row2"); - private static final String FAMILY_A_STRING = "a"; - private static final String FAMILY_B_STRING = "b"; - private static final byte[] FAMILY_A = Bytes.toBytes(FAMILY_A_STRING); - private static final byte[] FAMILY_B = Bytes.toBytes(FAMILY_B_STRING); - private static final byte[] QUALIFIER = Bytes.toBytes("q"); @Rule public TestName name = new TestName(); @@ -88,95 +70,29 @@ public static void afterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); } - private void doCopyTableTest(boolean bulkload) throws Exception { - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final byte[] FAMILY = Bytes.toBytes("family"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - - try (Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); - Table t2 = TEST_UTIL.createTable(tableName2, FAMILY);) { - // put rows into the first table - loadData(t1, FAMILY, COLUMN1); - - CopyTable copy = new CopyTable(); - int code; - if (bulkload) { - code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, - new String[] { "--new.name=" + tableName2.getNameAsString(), "--bulkload", - tableName1.getNameAsString() }); - } else { - code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, new String[] { - "--new.name=" + tableName2.getNameAsString(), tableName1.getNameAsString() }); - } - assertEquals("copy job failed", 0, code); - - // verify the data was copied into table 2 - verifyRows(t2, FAMILY, COLUMN1); - } finally { - TEST_UTIL.deleteTable(tableName1); - TEST_UTIL.deleteTable(tableName2); - } + @Override + protected Table createSourceTable(TableDescriptor desc) throws Exception { + return TEST_UTIL.createTable(desc, null); } - private void doCopyTableTestWithMob(boolean bulkload) throws Exception { - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final byte[] FAMILY = Bytes.toBytes("mob"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - - ColumnFamilyDescriptorBuilder cfd = ColumnFamilyDescriptorBuilder.newBuilder(FAMILY); - - cfd.setMobEnabled(true); - cfd.setMobThreshold(5); - TableDescriptor desc1 = - TableDescriptorBuilder.newBuilder(tableName1).setColumnFamily(cfd.build()).build(); - TableDescriptor desc2 = - TableDescriptorBuilder.newBuilder(tableName2).setColumnFamily(cfd.build()).build(); - - try (Table t1 = TEST_UTIL.createTable(desc1, null); - Table t2 = TEST_UTIL.createTable(desc2, null);) { - - // put rows into the first table - for (int i = 0; i < 10; i++) { - Put p = new Put(Bytes.toBytes("row" + i)); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - t1.put(p); - } - - CopyTable copy = new CopyTable(); + @Override + protected Table createTargetTable(TableDescriptor desc) throws Exception { + return TEST_UTIL.createTable(desc, null); + } - int code; - if (bulkload) { - code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, - new String[] { "--new.name=" + tableName2.getNameAsString(), "--bulkload", - tableName1.getNameAsString() }); - } else { - code = ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, new String[] { - "--new.name=" + tableName2.getNameAsString(), tableName1.getNameAsString() }); - } - assertEquals("copy job failed", 0, code); + @Override + protected void dropSourceTable(TableName tableName) throws Exception { + TEST_UTIL.deleteTable(tableName); + } - // verify the data was copied into table 2 - for (int i = 0; i < 10; i++) { - Get g = new Get(Bytes.toBytes("row" + i)); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1)); - assertEquals("compare row values between two tables", - t1.getDescriptor().getValue("row" + i), t2.getDescriptor().getValue("row" + i)); - } + @Override + protected void dropTargetTable(TableName tableName) throws Exception { + TEST_UTIL.deleteTable(tableName); + } - assertEquals("compare count of mob rows after table copy", - MobTestUtil.countMobRows(TEST_UTIL, t1), MobTestUtil.countMobRows(TEST_UTIL, t2)); - assertEquals("compare count of mob row values between two tables", - t1.getDescriptor().getValues().size(), t2.getDescriptor().getValues().size()); - assertTrue("The mob row count is 0 but should be > 0", - MobTestUtil.countMobRows(TEST_UTIL, t2) > 0); - } finally { - TEST_UTIL.deleteTable(tableName1); - TEST_UTIL.deleteTable(tableName2); - } + @Override + protected String[] getPeerClusterOptions() throws Exception { + return new String[0]; } /** @@ -184,7 +100,7 @@ private void doCopyTableTestWithMob(boolean bulkload) throws Exception { */ @Test public void testCopyTable() throws Exception { - doCopyTableTest(false); + doCopyTableTest(TEST_UTIL.getConfiguration(), false); } /** @@ -192,7 +108,7 @@ public void testCopyTable() throws Exception { */ @Test public void testCopyTableWithBulkload() throws Exception { - doCopyTableTest(true); + doCopyTableTest(TEST_UTIL.getConfiguration(), true); } /** @@ -200,7 +116,7 @@ public void testCopyTableWithBulkload() throws Exception { */ @Test public void testCopyTableWithMob() throws Exception { - doCopyTableTestWithMob(false); + doCopyTableTestWithMob(TEST_UTIL.getConfiguration(), false); } /** @@ -208,58 +124,12 @@ public void testCopyTableWithMob() throws Exception { */ @Test public void testCopyTableWithBulkloadWithMob() throws Exception { - doCopyTableTestWithMob(true); + doCopyTableTestWithMob(TEST_UTIL.getConfiguration(), true); } @Test public void testStartStopRow() throws Exception { - final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); - final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); - final byte[] FAMILY = Bytes.toBytes("family"); - final byte[] COLUMN1 = Bytes.toBytes("c1"); - final byte[] ROW0 = Bytes.toBytesBinary("\\x01row0"); - final byte[] ROW1 = Bytes.toBytesBinary("\\x01row1"); - final byte[] ROW2 = Bytes.toBytesBinary("\\x01row2"); - - Table t1 = TEST_UTIL.createTable(tableName1, FAMILY); - Table t2 = TEST_UTIL.createTable(tableName2, FAMILY); - - // put rows into the first table - Put p = new Put(ROW0); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - t1.put(p); - p = new Put(ROW1); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - t1.put(p); - p = new Put(ROW2); - p.addColumn(FAMILY, COLUMN1, COLUMN1); - t1.put(p); - - CopyTable copy = new CopyTable(); - assertEquals(0, - ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), copy, - new String[] { "--new.name=" + tableName2, "--startrow=\\x01row1", "--stoprow=\\x01row2", - tableName1.getNameAsString() })); - - // verify the data was copied into table 2 - // row1 exist, row0, row2 do not exist - Get g = new Get(ROW1); - Result r = t2.get(g); - assertEquals(1, r.size()); - assertTrue(CellUtil.matchingQualifier(r.rawCells()[0], COLUMN1)); - - g = new Get(ROW0); - r = t2.get(g); - assertEquals(0, r.size()); - - g = new Get(ROW2); - r = t2.get(g); - assertEquals(0, r.size()); - - t1.close(); - t2.close(); - TEST_UTIL.deleteTable(tableName1); - TEST_UTIL.deleteTable(tableName2); + testStartStopRow(TEST_UTIL.getConfiguration()); } /** @@ -267,42 +137,7 @@ public void testStartStopRow() throws Exception { */ @Test public void testRenameFamily() throws Exception { - final TableName sourceTable = TableName.valueOf(name.getMethodName() + "source"); - final TableName targetTable = TableName.valueOf(name.getMethodName() + "-target"); - - byte[][] families = { FAMILY_A, FAMILY_B }; - - Table t = TEST_UTIL.createTable(sourceTable, families); - Table t2 = TEST_UTIL.createTable(targetTable, families); - Put p = new Put(ROW1); - p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data11")); - p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data12")); - p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data13")); - t.put(p); - p = new Put(ROW2); - p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Dat21")); - p.addColumn(FAMILY_A, QUALIFIER, Bytes.toBytes("Data22")); - p.addColumn(FAMILY_B, QUALIFIER, Bytes.toBytes("Data23")); - t.put(p); - - long currentTime = EnvironmentEdgeManager.currentTime(); - String[] args = new String[] { "--new.name=" + targetTable, "--families=a:b", "--all.cells", - "--starttime=" + (currentTime - 100000), "--endtime=" + (currentTime + 100000), - "--versions=1", sourceTable.getNameAsString() }; - assertNull(t2.get(new Get(ROW1)).getRow()); - - assertTrue(runCopy(args)); - - assertNotNull(t2.get(new Get(ROW1)).getRow()); - Result res = t2.get(new Get(ROW1)); - byte[] b1 = res.getValue(FAMILY_B, QUALIFIER); - assertEquals("Data13", new String(b1)); - assertNotNull(t2.get(new Get(ROW2)).getRow()); - res = t2.get(new Get(ROW2)); - b1 = res.getValue(FAMILY_A, QUALIFIER); - // Data from the family of B is not copied - assertNull(b1); - + testRenameFamily(TEST_UTIL.getConfiguration()); } /** @@ -332,35 +167,6 @@ public void testMainMethod() throws Exception { assertTrue(data.toString().contains("Usage:")); } - private boolean runCopy(String[] args) throws Exception { - int status = - ToolRunner.run(new Configuration(TEST_UTIL.getConfiguration()), new CopyTable(), args); - return status == 0; - } - - private void loadData(Table t, byte[] family, byte[] column) throws IOException { - for (int i = 0; i < 10; i++) { - byte[] row = Bytes.toBytes("row" + i); - Put p = new Put(row); - p.addColumn(family, column, row); - t.put(p); - } - } - - private void verifyRows(Table t, byte[] family, byte[] column) throws IOException { - for (int i = 0; i < 10; i++) { - byte[] row = Bytes.toBytes("row" + i); - Get g = new Get(row).addFamily(family); - Result r = t.get(g); - Assert.assertNotNull(r); - Assert.assertEquals(1, r.size()); - Cell cell = r.rawCells()[0]; - Assert.assertTrue(CellUtil.matchingQualifier(cell, column)); - Assert.assertEquals(Bytes.compareTo(cell.getValueArray(), cell.getValueOffset(), - cell.getValueLength(), row, 0, row.length), 0); - } - } - private Table createTable(TableName tableName, byte[] family, boolean isMob) throws IOException { if (isMob) { ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(family) @@ -377,20 +183,26 @@ private void testCopyTableBySnapshot(String tablePrefix, boolean bulkLoad, boole throws Exception { TableName table1 = TableName.valueOf(tablePrefix + 1); TableName table2 = TableName.valueOf(tablePrefix + 2); - Table t1 = createTable(table1, FAMILY_A, isMob); - Table t2 = createTable(table2, FAMILY_A, isMob); - loadData(t1, FAMILY_A, Bytes.toBytes("qualifier")); String snapshot = tablePrefix + "_snapshot"; - TEST_UTIL.getAdmin().snapshot(snapshot, table1); - boolean success; - if (bulkLoad) { - success = - runCopy(new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot }); - } else { - success = runCopy(new String[] { "--snapshot", "--new.name=" + table2, snapshot }); + try (Table t1 = createTable(table1, FAMILY_A, isMob); + Table t2 = createTable(table2, FAMILY_A, isMob)) { + loadData(t1, FAMILY_A, Bytes.toBytes("qualifier")); + TEST_UTIL.getAdmin().snapshot(snapshot, table1); + boolean success; + if (bulkLoad) { + success = runCopy(TEST_UTIL.getConfiguration(), + new String[] { "--snapshot", "--new.name=" + table2, "--bulkload", snapshot }); + } else { + success = runCopy(TEST_UTIL.getConfiguration(), + new String[] { "--snapshot", "--new.name=" + table2, snapshot }); + } + assertTrue(success); + verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier")); + } finally { + TEST_UTIL.getAdmin().deleteSnapshot(snapshot); + TEST_UTIL.deleteTable(table1); + TEST_UTIL.deleteTable(table2); } - Assert.assertTrue(success); - verifyRows(t2, FAMILY_A, Bytes.toBytes("qualifier")); } @Test @@ -413,19 +225,15 @@ public void testLoadingSnapshotAndBulkLoadToMobTable() throws Exception { testCopyTableBySnapshot("testLoadingSnapshotAndBulkLoadToMobTable", true, true); } - @Test - public void testLoadingSnapshotToRemoteCluster() throws Exception { - Assert.assertFalse(runCopy( - new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase", "sourceSnapshotName" })); - } - @Test public void testLoadingSnapshotWithoutSnapshotName() throws Exception { - Assert.assertFalse(runCopy(new String[] { "--snapshot", "--peerAdr=hbase://remoteHBase" })); + assertFalse(runCopy(TEST_UTIL.getConfiguration(), new String[] { "--snapshot" })); } @Test public void testLoadingSnapshotWithoutDestTable() throws Exception { - Assert.assertFalse(runCopy(new String[] { "--snapshot", "sourceSnapshotName" })); + assertFalse( + runCopy(TEST_UTIL.getConfiguration(), new String[] { "--snapshot", "sourceSnapshotName" })); } + } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java new file mode 100644 index 000000000000..8e8ef0913d2f --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestCopyTableToPeerCluster.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.mapreduce; + +import static org.junit.Assert.assertFalse; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Test CopyTable between clusters + */ +@Category({ MapReduceTests.class, LargeTests.class }) +public class TestCopyTableToPeerCluster extends CopyTableTestBase { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCopyTableToPeerCluster.class); + + private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + + private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + + @BeforeClass + public static void beforeClass() throws Exception { + UTIL1.startMiniCluster(3); + UTIL2.startMiniCluster(3); + } + + @AfterClass + public static void afterClass() throws Exception { + UTIL1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + } + + @Override + protected Table createSourceTable(TableDescriptor desc) throws Exception { + return UTIL1.createTable(desc, null); + } + + @Override + protected Table createTargetTable(TableDescriptor desc) throws Exception { + return UTIL2.createTable(desc, null); + } + + @Override + protected void dropSourceTable(TableName tableName) throws Exception { + UTIL1.deleteTable(tableName); + } + + @Override + protected void dropTargetTable(TableName tableName) throws Exception { + UTIL2.deleteTable(tableName); + } + + @Override + protected String[] getPeerClusterOptions() throws Exception { + return new String[] { "--peer.adr=" + UTIL2.getClusterKey() }; + } + + /** + * Simple end-to-end test + */ + @Test + public void testCopyTable() throws Exception { + doCopyTableTest(UTIL1.getConfiguration(), false); + } + + /** + * Simple end-to-end test on table with MOB + */ + @Test + public void testCopyTableWithMob() throws Exception { + doCopyTableTestWithMob(UTIL1.getConfiguration(), false); + } + + @Test + public void testStartStopRow() throws Exception { + testStartStopRow(UTIL1.getConfiguration()); + } + + /** + * Test copy of table from sourceTable to targetTable all rows from family a + */ + @Test + public void testRenameFamily() throws Exception { + testRenameFamily(UTIL1.getConfiguration()); + } + + @Test + public void testBulkLoadNotSupported() throws Exception { + TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + try (Table t1 = UTIL1.createTable(tableName1, FAMILY_A); + Table t2 = UTIL2.createTable(tableName2, FAMILY_A)) { + assertFalse(runCopy(UTIL1.getConfiguration(), + new String[] { "--new.name=" + tableName2.getNameAsString(), "--bulkload", + "--peer.adr=" + UTIL2.getClusterKey(), tableName1.getNameAsString() })); + } finally { + UTIL1.deleteTable(tableName1); + UTIL2.deleteTable(tableName2); + } + } + + @Test + public void testSnapshotNotSupported() throws Exception { + TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); + TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); + String snapshot = tableName1.getNameAsString() + "_snapshot"; + try (Table t1 = UTIL1.createTable(tableName1, FAMILY_A); + Table t2 = UTIL2.createTable(tableName2, FAMILY_A)) { + UTIL1.getAdmin().snapshot(snapshot, tableName1); + assertFalse(runCopy(UTIL1.getConfiguration(), + new String[] { "--new.name=" + tableName2.getNameAsString(), "--snapshot", + "--peer.adr=" + UTIL2.getClusterKey(), snapshot })); + } finally { + UTIL1.getAdmin().deleteSnapshot(snapshot); + UTIL1.deleteTable(tableName1); + UTIL2.deleteTable(tableName2); + } + + } +} diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java index 4070da2e0e76..732d9d8ff88b 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hbase.mapreduce; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.util.Arrays; +import java.util.function.BooleanSupplier; import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; @@ -35,11 +39,11 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.SyncTable.SyncMapper.Counter; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MapReduceTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.mapreduce.Counters; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -49,12 +53,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; - /** * Basic test for the SyncTable M/R tool */ -@Category(LargeTests.class) +@Category({ MapReduceTests.class, LargeTests.class }) public class TestSyncTable { @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -62,20 +64,23 @@ public class TestSyncTable { private static final Logger LOG = LoggerFactory.getLogger(TestSyncTable.class); - private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + + private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); @Rule public TestName name = new TestName(); @BeforeClass public static void beforeClass() throws Exception { - TEST_UTIL.startMiniCluster(3); + UTIL1.startMiniCluster(3); + UTIL2.startMiniCluster(3); } @AfterClass public static void afterClass() throws Exception { - TEST_UTIL.cleanupDataTestDirOnTestFS(); - TEST_UTIL.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + UTIL1.shutdownMiniCluster(); } private static byte[][] generateSplits(int numRows, int numRegions) { @@ -86,16 +91,17 @@ private static byte[][] generateSplits(int numRows, int numRegions) { return splitRows; } - @Test - public void testSyncTable() throws Exception { + private void testSyncTable(HBaseTestingUtility source, HBaseTestingUtility target, + String... options) throws Exception { final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); - Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable"); + Path testDir = source.getDataTestDirOnTestFS(name.getMethodName()); - writeTestData(sourceTableName, targetTableName); - hashSourceTable(sourceTableName, testDir); - Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir); - assertEqualTables(90, sourceTableName, targetTableName, false); + writeTestData(source, sourceTableName, target, targetTableName); + hashSourceTable(source, sourceTableName, testDir); + Counters syncCounters = + syncTables(target.getConfiguration(), sourceTableName, targetTableName, testDir, options); + assertEqualTables(90, source, sourceTableName, target, targetTableName, false); assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); @@ -104,21 +110,37 @@ public void testSyncTable() throws Exception { assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); - TEST_UTIL.deleteTable(sourceTableName); - TEST_UTIL.deleteTable(targetTableName); + source.deleteTable(sourceTableName); + target.deleteTable(targetTableName); + } + + @Test + public void testSyncTable() throws Exception { + testSyncTable(UTIL1, UTIL1); + } + + @Test + public void testSyncTableToPeerCluster() throws Exception { + testSyncTable(UTIL1, UTIL2, "--sourcezkcluster=" + UTIL1.getClusterKey()); + } + + @Test + public void testSyncTableFromSourceToPeerCluster() throws Exception { + testSyncTable(UTIL2, UTIL1, "--sourcezkcluster=" + UTIL2.getClusterKey(), + "--targetzkcluster=" + UTIL1.getClusterKey()); } @Test public void testSyncTableDoDeletesFalse() throws Exception { final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); - Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse"); + Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName()); - writeTestData(sourceTableName, targetTableName); - hashSourceTable(sourceTableName, testDir); - Counters syncCounters = - syncTables(sourceTableName, targetTableName, testDir, "--doDeletes=false"); - assertTargetDoDeletesFalse(100, sourceTableName, targetTableName); + writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName); + hashSourceTable(UTIL1, sourceTableName, testDir); + Counters syncCounters = syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName, + testDir, "--doDeletes=false"); + assertTargetDoDeletesFalse(100, UTIL1, sourceTableName, UTIL1, targetTableName); assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); @@ -127,20 +149,21 @@ public void testSyncTableDoDeletesFalse() throws Exception { assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); - TEST_UTIL.deleteTable(sourceTableName); - TEST_UTIL.deleteTable(targetTableName); + UTIL1.deleteTable(sourceTableName); + UTIL1.deleteTable(targetTableName); } @Test public void testSyncTableDoPutsFalse() throws Exception { final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); - Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse"); + Path testDir = UTIL2.getDataTestDirOnTestFS(name.getMethodName()); - writeTestData(sourceTableName, targetTableName); - hashSourceTable(sourceTableName, testDir); - Counters syncCounters = syncTables(sourceTableName, targetTableName, testDir, "--doPuts=false"); - assertTargetDoPutsFalse(70, sourceTableName, targetTableName); + writeTestData(UTIL2, sourceTableName, UTIL2, targetTableName); + hashSourceTable(UTIL2, sourceTableName, testDir); + Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, + testDir, "--doPuts=false"); + assertTargetDoPutsFalse(70, UTIL2, sourceTableName, UTIL2, targetTableName); assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); @@ -149,21 +172,21 @@ public void testSyncTableDoPutsFalse() throws Exception { assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); - TEST_UTIL.deleteTable(sourceTableName); - TEST_UTIL.deleteTable(targetTableName); + UTIL2.deleteTable(sourceTableName); + UTIL2.deleteTable(targetTableName); } @Test public void testSyncTableIgnoreTimestampsTrue() throws Exception { final TableName sourceTableName = TableName.valueOf(name.getMethodName() + "_source"); final TableName targetTableName = TableName.valueOf(name.getMethodName() + "_target"); - Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableIgnoreTimestampsTrue"); + Path testDir = UTIL1.getDataTestDirOnTestFS(name.getMethodName()); long current = EnvironmentEdgeManager.currentTime(); - writeTestData(sourceTableName, targetTableName, current - 1000, current); - hashSourceTable(sourceTableName, testDir, "--ignoreTimestamps=true"); - Counters syncCounters = - syncTables(sourceTableName, targetTableName, testDir, "--ignoreTimestamps=true"); - assertEqualTables(90, sourceTableName, targetTableName, true); + writeTestData(UTIL1, sourceTableName, UTIL2, targetTableName, current - 1000, current); + hashSourceTable(UTIL1, sourceTableName, testDir, "--ignoreTimestamps=true"); + Counters syncCounters = syncTables(UTIL2.getConfiguration(), sourceTableName, targetTableName, + testDir, "--ignoreTimestamps=true", "--sourcezkcluster=" + UTIL1.getClusterKey()); + assertEqualTables(90, UTIL1, sourceTableName, UTIL2, targetTableName, true); assertEquals(50, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); @@ -172,256 +195,202 @@ public void testSyncTableIgnoreTimestampsTrue() throws Exception { assertEquals(30, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); - TEST_UTIL.deleteTable(sourceTableName); - TEST_UTIL.deleteTable(targetTableName); + UTIL1.deleteTable(sourceTableName); + UTIL2.deleteTable(targetTableName); } - private void assertEqualTables(int expectedRows, TableName sourceTableName, - TableName targetTableName, boolean ignoreTimestamps) throws Exception { - Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); - Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); - - ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); - ResultScanner targetScanner = targetTable.getScanner(new Scan()); - - for (int i = 0; i < expectedRows; i++) { - Result sourceRow = sourceScanner.next(); - Result targetRow = targetScanner.next(); - - LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) - + " cells:" + sourceRow); - LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) - + " cells:" + targetRow); - - if (sourceRow == null) { - Assert.fail("Expected " + expectedRows + " source rows but only found " + i); - } - if (targetRow == null) { - Assert.fail("Expected " + expectedRows + " target rows but only found " + i); - } - Cell[] sourceCells = sourceRow.rawCells(); - Cell[] targetCells = targetRow.rawCells(); - if (sourceCells.length != targetCells.length) { - LOG.debug("Source cells: " + Arrays.toString(sourceCells)); - LOG.debug("Target cells: " + Arrays.toString(targetCells)); - Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length - + " cells in source table but " + targetCells.length + " cells in target table"); - } - for (int j = 0; j < sourceCells.length; j++) { - Cell sourceCell = sourceCells[j]; - Cell targetCell = targetCells[j]; - try { - if (!CellUtil.matchingRows(sourceCell, targetCell)) { - Assert.fail("Rows don't match"); - } - if (!CellUtil.matchingFamily(sourceCell, targetCell)) { - Assert.fail("Families don't match"); - } - if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { - Assert.fail("Qualifiers don't match"); - } - if (!ignoreTimestamps && !CellUtil.matchingTimestamp(sourceCell, targetCell)) { - Assert.fail("Timestamps don't match"); - } - if (!CellUtil.matchingValue(sourceCell, targetCell)) { - Assert.fail("Values don't match"); - } - } catch (Throwable t) { - LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); - Throwables.propagate(t); - } - } + private void assertCellEquals(Cell sourceCell, Cell targetCell, BooleanSupplier checkTimestamp) { + assertTrue("Rows don't match, source: " + sourceCell + ", target: " + targetCell, + CellUtil.matchingRows(sourceCell, targetCell)); + assertTrue("Families don't match, source: " + sourceCell + ", target: " + targetCell, + CellUtil.matchingFamily(sourceCell, targetCell)); + assertTrue("Qualifiers don't match, source: " + sourceCell + ", target: " + targetCell, + CellUtil.matchingQualifier(sourceCell, targetCell)); + if (checkTimestamp.getAsBoolean()) { + assertTrue("Timestamps don't match, source: " + sourceCell + ", target: " + targetCell, + CellUtil.matchingTimestamp(sourceCell, targetCell)); } - Result sourceRow = sourceScanner.next(); - if (sourceRow != null) { - Assert.fail("Source table has more than " + expectedRows + " rows. Next row: " - + Bytes.toInt(sourceRow.getRow())); - } - Result targetRow = targetScanner.next(); - if (targetRow != null) { - Assert.fail("Target table has more than " + expectedRows + " rows. Next row: " - + Bytes.toInt(targetRow.getRow())); - } - sourceScanner.close(); - targetScanner.close(); - sourceTable.close(); - targetTable.close(); + assertTrue("Values don't match, source: " + sourceCell + ", target: " + targetCell, + CellUtil.matchingValue(sourceCell, targetCell)); } - private void assertTargetDoDeletesFalse(int expectedRows, TableName sourceTableName, - TableName targetTableName) throws Exception { - Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); - Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); - - ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); - ResultScanner targetScanner = targetTable.getScanner(new Scan()); - Result targetRow = targetScanner.next(); - Result sourceRow = sourceScanner.next(); - int rowsCount = 0; - while (targetRow != null) { - rowsCount++; - // only compares values for existing rows, skipping rows existing on - // target only that were not deleted given --doDeletes=false - if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { - targetRow = targetScanner.next(); - continue; - } - - LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) - + " cells:" + sourceRow); - LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) - + " cells:" + targetRow); - - Cell[] sourceCells = sourceRow.rawCells(); - Cell[] targetCells = targetRow.rawCells(); - int targetRowKey = Bytes.toInt(targetRow.getRow()); - if (targetRowKey >= 70 && targetRowKey < 80) { - if (sourceCells.length == targetCells.length) { - LOG.debug("Source cells: " + Arrays.toString(sourceCells)); - LOG.debug("Target cells: " + Arrays.toString(targetCells)); - Assert - .fail("Row " + targetRowKey + " should have more cells in " + "target than in source"); + private void assertEqualTables(int expectedRows, HBaseTestingUtility sourceCluster, + TableName sourceTableName, HBaseTestingUtility targetCluster, TableName targetTableName, + boolean ignoreTimestamps) throws Exception { + try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); + Table targetTable = targetCluster.getConnection().getTable(targetTableName); + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan())) { + for (int i = 0; i < expectedRows; i++) { + Result sourceRow = sourceScanner.next(); + Result targetRow = targetScanner.next(); + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + if (sourceRow == null) { + fail("Expected " + expectedRows + " source rows but only found " + i); } - - } else { + if (targetRow == null) { + fail("Expected " + expectedRows + " target rows but only found " + i); + } + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); if (sourceCells.length != targetCells.length) { LOG.debug("Source cells: " + Arrays.toString(sourceCells)); LOG.debug("Target cells: " + Arrays.toString(targetCells)); - Assert.fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length + fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length + " cells in source table but " + targetCells.length + " cells in target table"); } - } - for (int j = 0; j < sourceCells.length; j++) { - Cell sourceCell = sourceCells[j]; - Cell targetCell = targetCells[j]; - try { - if (!CellUtil.matchingRow(sourceCell, targetCell)) { - Assert.fail("Rows don't match"); - } - if (!CellUtil.matchingFamily(sourceCell, targetCell)) { - Assert.fail("Families don't match"); - } - if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { - Assert.fail("Qualifiers don't match"); - } - if (targetRowKey < 80 && targetRowKey >= 90) { - if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { - Assert.fail("Timestamps don't match"); - } - } - if (!CellUtil.matchingValue(sourceCell, targetCell)) { - Assert.fail("Values don't match"); - } - } catch (Throwable t) { - LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); - Throwables.propagate(t); + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + assertCellEquals(sourceCell, targetCell, () -> !ignoreTimestamps); } } - targetRow = targetScanner.next(); - sourceRow = sourceScanner.next(); + Result sourceRow = sourceScanner.next(); + if (sourceRow != null) { + fail("Source table has more than " + expectedRows + " rows. Next row: " + + Bytes.toInt(sourceRow.getRow())); + } + Result targetRow = targetScanner.next(); + if (targetRow != null) { + fail("Target table has more than " + expectedRows + " rows. Next row: " + + Bytes.toInt(targetRow.getRow())); + } } - assertEquals("Target expected rows does not match.", expectedRows, rowsCount); - sourceScanner.close(); - targetScanner.close(); - sourceTable.close(); - targetTable.close(); } - private void assertTargetDoPutsFalse(int expectedRows, TableName sourceTableName, - TableName targetTableName) throws Exception { - Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName); - Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName); - - ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); - ResultScanner targetScanner = targetTable.getScanner(new Scan()); - Result targetRow = targetScanner.next(); - Result sourceRow = sourceScanner.next(); - int rowsCount = 0; - - while (targetRow != null) { - // only compares values for existing rows, skipping rows existing on - // source only that were not added to target given --doPuts=false - if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { - sourceRow = sourceScanner.next(); - continue; - } + private void assertTargetDoDeletesFalse(int expectedRows, HBaseTestingUtility sourceCluster, + TableName sourceTableName, HBaseTestingUtility targetCluster, TableName targetTableName) + throws Exception { + try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); + Table targetTable = targetCluster.getConnection().getTable(targetTableName); - LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) - + " cells:" + sourceRow); - LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) - + " cells:" + targetRow); - - LOG.debug("rowsCount: " + rowsCount); - - Cell[] sourceCells = sourceRow.rawCells(); - Cell[] targetCells = targetRow.rawCells(); - int targetRowKey = Bytes.toInt(targetRow.getRow()); - if (targetRowKey >= 40 && targetRowKey < 60) { - LOG.debug("Source cells: " + Arrays.toString(sourceCells)); - LOG.debug("Target cells: " + Arrays.toString(targetCells)); - Assert.fail("There shouldn't exist any rows between 40 and 60, since " - + "Puts are disabled and Deletes are enabled."); - } else if (targetRowKey >= 60 && targetRowKey < 70) { - if (sourceCells.length == targetCells.length) { - LOG.debug("Source cells: " + Arrays.toString(sourceCells)); - LOG.debug("Target cells: " + Arrays.toString(targetCells)); - Assert.fail( - "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells."); + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan())) { + Result targetRow = targetScanner.next(); + Result sourceRow = sourceScanner.next(); + int rowsCount = 0; + while (targetRow != null) { + rowsCount++; + // only compares values for existing rows, skipping rows existing on + // target only that were not deleted given --doDeletes=false + if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { + targetRow = targetScanner.next(); + continue; } - } else if (targetRowKey >= 80 && targetRowKey < 90) { - LOG.debug("Source cells: " + Arrays.toString(sourceCells)); - LOG.debug("Target cells: " + Arrays.toString(targetCells)); - Assert.fail("There should be no rows between 80 and 90 on target, as " - + "these had different timestamps and should had been deleted."); - } else if (targetRowKey >= 90 && targetRowKey < 100) { - for (int j = 0; j < sourceCells.length; j++) { - Cell sourceCell = sourceCells[j]; - Cell targetCell = targetCells[j]; - if (CellUtil.matchingValue(sourceCell, targetCell)) { - Assert.fail("Cells values should not match for rows between " - + "90 and 100. Target row id: " + (Bytes.toInt(targetRow.getRow()))); + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + int targetRowKey = Bytes.toInt(targetRow.getRow()); + if (targetRowKey >= 70 && targetRowKey < 80) { + if (sourceCells.length == targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + fail("Row " + targetRowKey + " should have more cells in " + "target than in source"); + } + + } else { + if (sourceCells.length != targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + fail("Row " + Bytes.toInt(sourceRow.getRow()) + " has " + sourceCells.length + + " cells in source table but " + targetCells.length + " cells in target table"); } } - } else { for (int j = 0; j < sourceCells.length; j++) { Cell sourceCell = sourceCells[j]; Cell targetCell = targetCells[j]; - try { - if (!CellUtil.matchingRow(sourceCell, targetCell)) { - Assert.fail("Rows don't match"); - } - if (!CellUtil.matchingFamily(sourceCell, targetCell)) { - Assert.fail("Families don't match"); - } - if (!CellUtil.matchingQualifier(sourceCell, targetCell)) { - Assert.fail("Qualifiers don't match"); - } - if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) { - Assert.fail("Timestamps don't match"); - } - if (!CellUtil.matchingValue(sourceCell, targetCell)) { - Assert.fail("Values don't match"); + assertCellEquals(sourceCell, targetCell, () -> targetRowKey < 80 && targetRowKey >= 90); + } + targetRow = targetScanner.next(); + sourceRow = sourceScanner.next(); + } + assertEquals("Target expected rows does not match.", expectedRows, rowsCount); + } + } + + private void assertTargetDoPutsFalse(int expectedRows, HBaseTestingUtility sourceCluster, + TableName sourceTableName, HBaseTestingUtility targetCluster, TableName targetTableName) + throws Exception { + try (Table sourceTable = sourceCluster.getConnection().getTable(sourceTableName); + Table targetTable = targetCluster.getConnection().getTable(targetTableName); + ResultScanner sourceScanner = sourceTable.getScanner(new Scan()); + ResultScanner targetScanner = targetTable.getScanner(new Scan())) { + Result targetRow = targetScanner.next(); + Result sourceRow = sourceScanner.next(); + int rowsCount = 0; + + while (targetRow != null) { + // only compares values for existing rows, skipping rows existing on + // source only that were not added to target given --doPuts=false + if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) { + sourceRow = sourceScanner.next(); + continue; + } + + LOG.debug("SOURCE row: " + (sourceRow == null ? "null" : Bytes.toInt(sourceRow.getRow())) + + " cells:" + sourceRow); + LOG.debug("TARGET row: " + (targetRow == null ? "null" : Bytes.toInt(targetRow.getRow())) + + " cells:" + targetRow); + + LOG.debug("rowsCount: " + rowsCount); + + Cell[] sourceCells = sourceRow.rawCells(); + Cell[] targetCells = targetRow.rawCells(); + int targetRowKey = Bytes.toInt(targetRow.getRow()); + if (targetRowKey >= 40 && targetRowKey < 60) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + fail("There shouldn't exist any rows between 40 and 60, since " + + "Puts are disabled and Deletes are enabled."); + } else if (targetRowKey >= 60 && targetRowKey < 70) { + if (sourceCells.length == targetCells.length) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + fail( + "Row " + Bytes.toInt(sourceRow.getRow()) + " shouldn't have same number of cells."); + } + } else if (targetRowKey >= 80 && targetRowKey < 90) { + LOG.debug("Source cells: " + Arrays.toString(sourceCells)); + LOG.debug("Target cells: " + Arrays.toString(targetCells)); + fail("There should be no rows between 80 and 90 on target, as " + + "these had different timestamps and should had been deleted."); + } else if (targetRowKey >= 90 && targetRowKey < 100) { + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + if (CellUtil.matchingValue(sourceCell, targetCell)) { + fail("Cells values should not match for rows between " + "90 and 100. Target row id: " + + Bytes.toInt(targetRow.getRow())); } - } catch (Throwable t) { - LOG.debug("Source cell: " + sourceCell + " target cell: " + targetCell); - Throwables.propagate(t); + } + } else { + for (int j = 0; j < sourceCells.length; j++) { + Cell sourceCell = sourceCells[j]; + Cell targetCell = targetCells[j]; + assertCellEquals(sourceCell, targetCell, () -> true); } } + rowsCount++; + targetRow = targetScanner.next(); + sourceRow = sourceScanner.next(); } - rowsCount++; - targetRow = targetScanner.next(); - sourceRow = sourceScanner.next(); + assertEquals("Target expected rows does not match.", expectedRows, rowsCount); } - assertEquals("Target expected rows does not match.", expectedRows, rowsCount); - sourceScanner.close(); - targetScanner.close(); - sourceTable.close(); - targetTable.close(); } - private Counters syncTables(TableName sourceTableName, TableName targetTableName, Path testDir, - String... options) throws Exception { - SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration()); + private Counters syncTables(Configuration conf, TableName sourceTableName, + TableName targetTableName, Path testDir, String... options) throws Exception { + SyncTable syncTable = new SyncTable(conf); String[] args = Arrays.copyOf(options, options.length + 3); args[options.length] = testDir.toString(); args[options.length + 1] = sourceTableName.getNameAsString(); @@ -433,12 +402,12 @@ private Counters syncTables(TableName sourceTableName, TableName targetTableName return syncTable.counters; } - private void hashSourceTable(TableName sourceTableName, Path testDir, String... options) - throws Exception { + private void hashSourceTable(HBaseTestingUtility sourceCluster, TableName sourceTableName, + Path testDir, String... options) throws Exception { int numHashFiles = 3; long batchSize = 100; // should be 2 batches per region int scanBatch = 1; - HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); + HashTable hashTable = new HashTable(sourceCluster.getConfiguration()); String[] args = Arrays.copyOf(options, options.length + 5); args[options.length] = "--batchsize=" + batchSize; args[options.length + 1] = "--numhashfiles=" + numHashFiles; @@ -448,7 +417,7 @@ private void hashSourceTable(TableName sourceTableName, Path testDir, String... int code = hashTable.run(args); assertEquals("hash table job failed", 0, code); - FileSystem fs = TEST_UTIL.getTestFileSystem(); + FileSystem fs = sourceCluster.getTestFileSystem(); HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); assertEquals(sourceTableName.getNameAsString(), tableHash.tableName); @@ -459,8 +428,9 @@ private void hashSourceTable(TableName sourceTableName, Path testDir, String... LOG.info("Hash table completed"); } - private void writeTestData(TableName sourceTableName, TableName targetTableName, - long... timestamps) throws Exception { + private void writeTestData(HBaseTestingUtility sourceCluster, TableName sourceTableName, + HBaseTestingUtility targetCluster, TableName targetTableName, long... timestamps) + throws Exception { final byte[] family = Bytes.toBytes("family"); final byte[] column1 = Bytes.toBytes("c1"); final byte[] column2 = Bytes.toBytes("c2"); @@ -476,102 +446,100 @@ private void writeTestData(TableName sourceTableName, TableName targetTableName, timestamps = new long[] { current, current }; } - Table sourceTable = - TEST_UTIL.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions)); - - Table targetTable = - TEST_UTIL.createTable(targetTableName, family, generateSplits(numRows, targetRegions)); - - int rowIndex = 0; - // a bunch of identical rows - for (; rowIndex < 40; rowIndex++) { - Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamps[0], value1); - sourcePut.addColumn(family, column2, timestamps[0], value2); - sourceTable.put(sourcePut); - - Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamps[1], value1); - targetPut.addColumn(family, column2, timestamps[1], value2); - targetTable.put(targetPut); - } - // some rows only in the source table - // ROWSWITHDIFFS: 10 - // TARGETMISSINGROWS: 10 - // TARGETMISSINGCELLS: 20 - for (; rowIndex < 50; rowIndex++) { - Put put = new Put(Bytes.toBytes(rowIndex)); - put.addColumn(family, column1, timestamps[0], value1); - put.addColumn(family, column2, timestamps[0], value2); - sourceTable.put(put); - } - // some rows only in the target table - // ROWSWITHDIFFS: 10 - // SOURCEMISSINGROWS: 10 - // SOURCEMISSINGCELLS: 20 - for (; rowIndex < 60; rowIndex++) { - Put put = new Put(Bytes.toBytes(rowIndex)); - put.addColumn(family, column1, timestamps[1], value1); - put.addColumn(family, column2, timestamps[1], value2); - targetTable.put(put); - } - // some rows with 1 missing cell in target table - // ROWSWITHDIFFS: 10 - // TARGETMISSINGCELLS: 10 - for (; rowIndex < 70; rowIndex++) { - Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamps[0], value1); - sourcePut.addColumn(family, column2, timestamps[0], value2); - sourceTable.put(sourcePut); - - Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamps[1], value1); - targetTable.put(targetPut); - } - // some rows with 1 missing cell in source table - // ROWSWITHDIFFS: 10 - // SOURCEMISSINGCELLS: 10 - for (; rowIndex < 80; rowIndex++) { - Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamps[0], value1); - sourceTable.put(sourcePut); - - Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamps[1], value1); - targetPut.addColumn(family, column2, timestamps[1], value2); - targetTable.put(targetPut); - } - // some rows differing only in timestamp - // ROWSWITHDIFFS: 10 - // SOURCEMISSINGCELLS: 20 - // TARGETMISSINGCELLS: 20 - for (; rowIndex < 90; rowIndex++) { - Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamps[0], column1); - sourcePut.addColumn(family, column2, timestamps[0], value2); - sourceTable.put(sourcePut); - - Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamps[1] + 1, column1); - targetPut.addColumn(family, column2, timestamps[1] - 1, value2); - targetTable.put(targetPut); - } - // some rows with different values - // ROWSWITHDIFFS: 10 - // DIFFERENTCELLVALUES: 20 - for (; rowIndex < numRows; rowIndex++) { - Put sourcePut = new Put(Bytes.toBytes(rowIndex)); - sourcePut.addColumn(family, column1, timestamps[0], value1); - sourcePut.addColumn(family, column2, timestamps[0], value2); - sourceTable.put(sourcePut); - - Put targetPut = new Put(Bytes.toBytes(rowIndex)); - targetPut.addColumn(family, column1, timestamps[1], value3); - targetPut.addColumn(family, column2, timestamps[1], value3); - targetTable.put(targetPut); + try ( + Table sourceTable = + sourceCluster.createTable(sourceTableName, family, generateSplits(numRows, sourceRegions)); + Table targetTable = targetCluster.createTable(targetTableName, family, + generateSplits(numRows, targetRegions))) { + + int rowIndex = 0; + // a bunch of identical rows + for (; rowIndex < 40; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourcePut.addColumn(family, column2, timestamps[0], value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamps[1], value1); + targetPut.addColumn(family, column2, timestamps[1], value2); + targetTable.put(targetPut); + } + // some rows only in the source table + // ROWSWITHDIFFS: 10 + // TARGETMISSINGROWS: 10 + // TARGETMISSINGCELLS: 20 + for (; rowIndex < 50; rowIndex++) { + Put put = new Put(Bytes.toBytes(rowIndex)); + put.addColumn(family, column1, timestamps[0], value1); + put.addColumn(family, column2, timestamps[0], value2); + sourceTable.put(put); + } + // some rows only in the target table + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGROWS: 10 + // SOURCEMISSINGCELLS: 20 + for (; rowIndex < 60; rowIndex++) { + Put put = new Put(Bytes.toBytes(rowIndex)); + put.addColumn(family, column1, timestamps[1], value1); + put.addColumn(family, column2, timestamps[1], value2); + targetTable.put(put); + } + // some rows with 1 missing cell in target table + // ROWSWITHDIFFS: 10 + // TARGETMISSINGCELLS: 10 + for (; rowIndex < 70; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourcePut.addColumn(family, column2, timestamps[0], value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamps[1], value1); + targetTable.put(targetPut); + } + // some rows with 1 missing cell in source table + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGCELLS: 10 + for (; rowIndex < 80; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamps[1], value1); + targetPut.addColumn(family, column2, timestamps[1], value2); + targetTable.put(targetPut); + } + // some rows differing only in timestamp + // ROWSWITHDIFFS: 10 + // SOURCEMISSINGCELLS: 20 + // TARGETMISSINGCELLS: 20 + for (; rowIndex < 90; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamps[0], column1); + sourcePut.addColumn(family, column2, timestamps[0], value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamps[1] + 1, column1); + targetPut.addColumn(family, column2, timestamps[1] - 1, value2); + targetTable.put(targetPut); + } + // some rows with different values + // ROWSWITHDIFFS: 10 + // DIFFERENTCELLVALUES: 20 + for (; rowIndex < numRows; rowIndex++) { + Put sourcePut = new Put(Bytes.toBytes(rowIndex)); + sourcePut.addColumn(family, column1, timestamps[0], value1); + sourcePut.addColumn(family, column2, timestamps[0], value2); + sourceTable.put(sourcePut); + + Put targetPut = new Put(Bytes.toBytes(rowIndex)); + targetPut.addColumn(family, column1, timestamps[1], value3); + targetPut.addColumn(family, column2, timestamps[1], value3); + targetTable.put(targetPut); + } } - - sourceTable.close(); - targetTable.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java index a6de320e13a9..1d3d5bf5ad33 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/MobTestUtil.java @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -115,11 +114,17 @@ public static void assertCellsValue(Table table, Scan scan, byte[] expectedValue * @param table to get the scanner * @return the number of rows */ - public static int countMobRows(HBaseTestingUtility util, Table table) throws IOException { + public static int countMobRows(Table table) throws IOException { Scan scan = new Scan(); // Do not retrieve the mob data when scanning scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); - return util.countRows(table, scan); + try (ResultScanner results = table.getScanner(scan)) { + int count = 0; + while (results.next() != null) { + count++; + } + return count; + } } public static Path generateMOBFileForRegion(Configuration conf, TableName tableName,