diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java index 78be5af1bf4b..52f340251c2d 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java @@ -19,9 +19,6 @@ package org.apache.hadoop.hbase.mapreduce; import java.io.IOException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -41,6 +38,9 @@ import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored @@ -155,11 +155,10 @@ public void write(KEY key, Mutation value) * @param context The current task context. * @return The newly created writer instance. * @throws IOException When creating the writer fails. - * @throws InterruptedException When the jobs is cancelled. */ @Override public RecordWriter getRecordWriter(TaskAttemptContext context) - throws IOException, InterruptedException { + throws IOException { return new TableRecordWriter(); } @@ -168,18 +167,18 @@ public RecordWriter getRecordWriter(TaskAttemptContext context) * * @param context The current context. * @throws IOException When the check fails. - * @throws InterruptedException When the job is aborted. * @see OutputFormat#checkOutputSpecs(JobContext) */ @Override - public void checkOutputSpecs(JobContext context) throws IOException, - InterruptedException { + public void checkOutputSpecs(JobContext context) + throws IOException { Configuration hConf = getConf(); if (hConf == null) { hConf = context.getConfiguration(); } - try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) { + try (Connection connection = ConnectionFactory.createConnection(hConf); + Admin admin = connection.getAdmin()) { TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); if (!admin.tableExists(tableName)) { throw new TableNotFoundException("Can't write, table does not exist:" + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 6a6a1c48925f..b83749d9c337 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionReplicaUtil; @@ -161,25 +162,26 @@ public static void setupRegionReplicaReplication(Configuration conf) throws IOEx if (!isRegionReplicaReplicationEnabled(conf)) { return; } - Admin admin = ConnectionFactory.createConnection(conf).getAdmin(); - ReplicationPeerConfig peerConfig = null; - try { - peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER); - } catch (ReplicationPeerNotFoundException e) { - LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER - + " not exist", e); - } - try { + + try (Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin()) { + ReplicationPeerConfig peerConfig = null; + try { + peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER); + } catch (ReplicationPeerNotFoundException e) { + LOG.warn( + "Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist", + e); + } + if (peerConfig == null) { LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER - + " not exist. Creating..."); + + " not exist. Creating..."); peerConfig = new ReplicationPeerConfig(); peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf)); peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()); admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig); } - } finally { - admin.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java index 398b97ac2dea..1fb9df66abb0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -64,6 +66,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase private static HBaseTestingUtility TEST_UTIL2; private static Configuration conf2; private static AsyncAdmin admin2; + private static AsyncConnection connection; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -78,14 +81,21 @@ public static void setUpBeforeClass() throws Exception { conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); TEST_UTIL2 = new HBaseTestingUtility(conf2); TEST_UTIL2.startMiniCluster(); - admin2 = - ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin(); + + connection = + ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get(); + admin2 = connection.getAdmin(); ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(TEST_UTIL2.getClusterKey()); ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); } + @AfterClass + public static void clearUp() throws IOException { + connection.close(); + } + @Override @After public void tearDown() throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 10d3ea698161..5019fb3c714b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -396,12 +396,12 @@ public void testReplicaAndReplication() throws Exception { LOG.info("Setup second Zk"); HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); - - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(HTU2.getClusterKey()); - admin.addReplicationPeer("2", rpc); - admin.close(); + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Admin admin = connection.getAdmin()) { + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(HTU2.getClusterKey()); + admin.addReplicationPeer("2", rpc); + } Put p = new Put(row); p.addColumn(row, row, row); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java index 126a175d77c7..74c07e419967 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.AsyncAdmin; +import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; @@ -149,27 +150,29 @@ public void testClearBlockCacheFromAdmin() throws Exception { @Test public void testClearBlockCacheFromAsyncAdmin() throws Exception { - AsyncAdmin admin = - ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin(); - - BlockCache blockCache1 = rs1.getBlockCache().get(); - BlockCache blockCache2 = rs2.getBlockCache().get(); - long initialBlockCount1 = blockCache1.getBlockCount(); - long initialBlockCount2 = blockCache2.getBlockCount(); - - // scan will cause blocks to be added in BlockCache - scanAllRegionsForRS(rs1); - assertEquals(blockCache1.getBlockCount() - initialBlockCount1, + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(HTU.getConfiguration()) + .get()) { + AsyncAdmin admin = conn.getAdmin(); + + BlockCache blockCache1 = rs1.getBlockCache().get(); + BlockCache blockCache2 = rs2.getBlockCache().get(); + long initialBlockCount1 = blockCache1.getBlockCount(); + long initialBlockCount2 = blockCache2.getBlockCount(); + + // scan will cause blocks to be added in BlockCache + scanAllRegionsForRS(rs1); + assertEquals(blockCache1.getBlockCount() - initialBlockCount1, HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY)); - scanAllRegionsForRS(rs2); - assertEquals(blockCache2.getBlockCount() - initialBlockCount2, + scanAllRegionsForRS(rs2); + assertEquals(blockCache2.getBlockCount() - initialBlockCount2, HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY)); - CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get(); - assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU + CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get(); + assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU .getNumHFilesForRS(rs2, TABLE_NAME, FAMILY)); - assertEquals(initialBlockCount1, blockCache1.getBlockCount()); - assertEquals(initialBlockCount2, blockCache2.getBlockCount()); + assertEquals(initialBlockCount1, blockCache1.getBlockCount()); + assertEquals(initialBlockCount2, blockCache2.getBlockCount()); + } } private void scanAllRegionsForRS(HRegionServer rs) throws IOException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index fff10345cb5d..7210fc5d7ff2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -486,8 +487,8 @@ private void createTableOnClusters(TableDescriptor table) throws Exception { private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber) throws Exception { - try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) - .getAdmin()) { + try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); + Admin admin = conn.getAdmin()) { admin.addReplicationPeer(id, new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())); } @@ -495,8 +496,8 @@ private void addPeer(String id, int masterClusterNumber, private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs) throws Exception { - try (Admin admin = - ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) { + try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); + Admin admin = conn.getAdmin()) { admin.addReplicationPeer( id, new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) @@ -506,15 +507,15 @@ private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, } private void disablePeer(String id, int masterClusterNumber) throws Exception { - try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) - .getAdmin()) { + try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); + Admin admin = conn.getAdmin()) { admin.disableReplicationPeer(id); } } private void enablePeer(String id, int masterClusterNumber) throws Exception { - try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber]) - .getAdmin()) { + try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]); + Admin admin = conn.getAdmin()) { admin.enableReplicationPeer(id); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java index a7b9e157f3d7..449ecef2bc5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; @@ -138,73 +139,74 @@ public void testMultiSlaveReplication() throws Exception { MiniHBaseCluster master = utility1.startMiniCluster(); utility2.startMiniCluster(); utility3.startMiniCluster(); - Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin(); - - utility1.getAdmin().createTable(table); - utility2.getAdmin().createTable(table); - utility3.getAdmin().createTable(table); - Table htable1 = utility1.getConnection().getTable(tableName); - Table htable2 = utility2.getConnection().getTable(tableName); - Table htable3 = utility3.getConnection().getTable(tableName); - - ReplicationPeerConfig rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility2.getClusterKey()); - admin1.addReplicationPeer("1", rpc); - - // put "row" and wait 'til it got around, then delete - putAndWait(row, famName, htable1, htable2); - deleteAndWait(row, htable1, htable2); - // check it wasn't replication to cluster 3 - checkRow(row,0,htable3); - - putAndWait(row2, famName, htable1, htable2); - - // now roll the region server's logs - rollWALAndWait(utility1, htable1.getName(), row2); - - // after the log was rolled put a new row - putAndWait(row3, famName, htable1, htable2); - - rpc = new ReplicationPeerConfig(); - rpc.setClusterKey(utility3.getClusterKey()); - admin1.addReplicationPeer("2", rpc); - - // put a row, check it was replicated to all clusters - putAndWait(row1, famName, htable1, htable2, htable3); - // delete and verify - deleteAndWait(row1, htable1, htable2, htable3); - - // make sure row2 did not get replicated after - // cluster 3 was added - checkRow(row2,0,htable3); - - // row3 will get replicated, because it was in the - // latest log - checkRow(row3,1,htable3); - - Put p = new Put(row); - p.addColumn(famName, row, row); - htable1.put(p); - // now roll the logs again - rollWALAndWait(utility1, htable1.getName(), row); - - // cleanup "row2", also conveniently use this to wait replication - // to finish - deleteAndWait(row2, htable1, htable2, htable3); - // Even if the log was rolled in the middle of the replication - // "row" is still replication. - checkRow(row, 1, htable2); - // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, - // we should wait before checking. - checkWithWait(row, 1, htable3); - - // cleanup the rest - deleteAndWait(row, htable1, htable2, htable3); - deleteAndWait(row3, htable1, htable2, htable3); - - utility3.shutdownMiniCluster(); - utility2.shutdownMiniCluster(); - utility1.shutdownMiniCluster(); + try (Connection conn = ConnectionFactory.createConnection(conf1); + Admin admin1 = conn.getAdmin()) { + utility1.getAdmin().createTable(table); + utility2.getAdmin().createTable(table); + utility3.getAdmin().createTable(table); + Table htable1 = utility1.getConnection().getTable(tableName); + Table htable2 = utility2.getConnection().getTable(tableName); + Table htable3 = utility3.getConnection().getTable(tableName); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility2.getClusterKey()); + admin1.addReplicationPeer("1", rpc); + + // put "row" and wait 'til it got around, then delete + putAndWait(row, famName, htable1, htable2); + deleteAndWait(row, htable1, htable2); + // check it wasn't replication to cluster 3 + checkRow(row, 0, htable3); + + putAndWait(row2, famName, htable1, htable2); + + // now roll the region server's logs + rollWALAndWait(utility1, htable1.getName(), row2); + + // after the log was rolled put a new row + putAndWait(row3, famName, htable1, htable2); + + rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(utility3.getClusterKey()); + admin1.addReplicationPeer("2", rpc); + + // put a row, check it was replicated to all clusters + putAndWait(row1, famName, htable1, htable2, htable3); + // delete and verify + deleteAndWait(row1, htable1, htable2, htable3); + + // make sure row2 did not get replicated after + // cluster 3 was added + checkRow(row2, 0, htable3); + + // row3 will get replicated, because it was in the + // latest log + checkRow(row3, 1, htable3); + + Put p = new Put(row); + p.addColumn(famName, row, row); + htable1.put(p); + // now roll the logs again + rollWALAndWait(utility1, htable1.getName(), row); + + // cleanup "row2", also conveniently use this to wait replication + // to finish + deleteAndWait(row2, htable1, htable2, htable3); + // Even if the log was rolled in the middle of the replication + // "row" is still replication. + checkRow(row, 1, htable2); + // Replication thread of cluster 2 may be sleeping, and since row2 is not there in it, + // we should wait before checking. + checkWithWait(row, 1, htable3); + + // cleanup the rest + deleteAndWait(row, htable1, htable2, htable3); + deleteAndWait(row3, htable1, htable2, htable3); + + utility3.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + utility1.shutdownMiniCluster(); + } } private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index 8fe187b2dc5f..84d1bb92819c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.hbase.replication; -import static org.junit.Assert.*; - +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -56,7 +61,6 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; @Category({FlakeyTests.class, LargeTests.class}) @@ -375,14 +379,13 @@ public void testTableCFsHelperConverter() { @Test public void testPerTableCFReplication() throws Exception { LOG.info("testPerTableCFReplication"); - Admin replicationAdmin = ConnectionFactory.createConnection(conf1).getAdmin(); - Connection connection1 = ConnectionFactory.createConnection(conf1); - Connection connection2 = ConnectionFactory.createConnection(conf2); - Connection connection3 = ConnectionFactory.createConnection(conf3); - try { + try (Connection connection1 = ConnectionFactory.createConnection(conf1); + Connection connection2 = ConnectionFactory.createConnection(conf2); + Connection connection3 = ConnectionFactory.createConnection(conf3); Admin admin1 = connection1.getAdmin(); Admin admin2 = connection2.getAdmin(); Admin admin3 = connection3.getAdmin(); + Admin replicationAdmin = connection1.getAdmin()) { admin1.createTable(tabA); admin1.createTable(tabB); @@ -524,10 +527,6 @@ public void testPerTableCFReplication() throws Exception { // cf 'f3' of tableC can replicated to cluster2 and cluster3 putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C); - } finally { - connection1.close(); - connection2.close(); - connection3.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 786bb64bb6f1..ef5776b31bff 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -54,7 +53,6 @@ import org.junit.BeforeClass; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @@ -66,7 +64,8 @@ */ public class TestReplicationBase { private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); - + private static Connection connection1; + private static Connection connection2; protected static Configuration CONF_WITH_LOCALFS; protected static Admin hbaseAdmin; @@ -244,25 +243,26 @@ private static void startClusters() throws Exception { // as a component in deciding maximum number of parallel batches to send to the peer cluster. UTIL2.startMiniCluster(NUM_SLAVES2); - hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin(); + connection1 = ConnectionFactory.createConnection(CONF1); + connection2 = ConnectionFactory.createConnection(CONF2); + hbaseAdmin = connection1.getAdmin(); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); - Connection connection1 = ConnectionFactory.createConnection(CONF1); - Connection connection2 = ConnectionFactory.createConnection(CONF2); - try (Admin admin1 = connection1.getAdmin()) { + try ( + Admin admin1 = connection1.getAdmin(); + Admin admin2 = connection2.getAdmin()) { admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); - } - try (Admin admin2 = connection2.getAdmin()) { admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); + UTIL1.waitUntilAllRegionsAssigned(tableName); + htable1 = connection1.getTable(tableName); + UTIL2.waitUntilAllRegionsAssigned(tableName); + htable2 = connection2.getTable(tableName); } - UTIL1.waitUntilAllRegionsAssigned(tableName); - UTIL2.waitUntilAllRegionsAssigned(tableName); - htable1 = connection1.getTable(tableName); - htable2 = connection2.getTable(tableName); + } @BeforeClass @@ -366,6 +366,13 @@ public static void tearDownAfterClass() throws Exception { if (hbaseAdmin != null) { hbaseAdmin.close(); } + + if (connection2 != null) { + connection2.close(); + } + if (connection1 != null) { + connection1.close(); + } UTIL2.shutdownMiniCluster(); UTIL1.shutdownMiniCluster(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java index c07312a19dfd..1d0d055aebe7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; @@ -105,11 +106,13 @@ public static void setUpBeforeClass() throws Exception { utility1.startMiniCluster(); utility2.startMiniCluster(); - Admin admin1 = ConnectionFactory.createConnection(conf1).getAdmin(); - admin1.addReplicationPeer("peer1", rpc); - admin1.addReplicationPeer("peer2", rpc); - admin1.addReplicationPeer("peer3", rpc); - numOfPeer = admin1.listReplicationPeers().size(); + try (Connection connection = ConnectionFactory.createConnection(utility1.getConfiguration()); + Admin admin1 = connection.getAdmin()) { + admin1.addReplicationPeer("peer1", rpc); + admin1.addReplicationPeer("peer2", rpc); + admin1.addReplicationPeer("peer3", rpc); + numOfPeer = admin1.listReplicationPeers().size(); + } } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java index c29149da89cf..ced2b6c8e198 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java @@ -53,7 +53,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; -import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.FlakeyTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -122,100 +121,102 @@ public static void afterClass() throws Exception { } @Test - public void testRegionReplicaReplicationPeerIsCreated() throws IOException, ReplicationException { + public void testRegionReplicaReplicationPeerIsCreated() throws IOException { // create a table with region replicas. Check whether the replication peer is created // and replication started. - Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); - String peerId = "region_replica_replication"; - - ReplicationPeerConfig peerConfig = null; - try { - peerConfig = admin.getReplicationPeerConfig(peerId); - } catch (ReplicationPeerNotFoundException e) { - LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); - } + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Admin admin = connection.getAdmin()) { + String peerId = "region_replica_replication"; + + ReplicationPeerConfig peerConfig = null; + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + } catch (ReplicationPeerNotFoundException e) { + LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); + } - if (peerConfig != null) { - admin.removeReplicationPeer(peerId); - peerConfig = null; - } + if (peerConfig != null) { + admin.removeReplicationPeer(peerId); + peerConfig = null; + } - HTableDescriptor htd = HTU.createTableDescriptor( - TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"), - HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, - HColumnDescriptor.DEFAULT_KEEP_DELETED); - HTU.getAdmin().createTable(htd); - try { - peerConfig = admin.getReplicationPeerConfig(peerId); - fail("Should throw ReplicationException, because replication peer id=" + peerId + HTableDescriptor htd = HTU + .createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + HTU.getAdmin().createTable(htd); + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + fail("Should throw ReplicationException, because replication peer id=" + peerId + " not exist"); - } catch (ReplicationPeerNotFoundException e) { - } - assertNull(peerConfig); + } catch (ReplicationPeerNotFoundException e) { + } + assertNull(peerConfig); - htd = HTU.createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"), - HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, - HColumnDescriptor.DEFAULT_KEEP_DELETED); - htd.setRegionReplication(2); - HTU.getAdmin().createTable(htd); + htd = HTU.createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + htd.setRegionReplication(2); + HTU.getAdmin().createTable(htd); - // assert peer configuration is correct - peerConfig = admin.getReplicationPeerConfig(peerId); - assertNotNull(peerConfig); - assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( - HTU.getConfiguration())); - assertEquals(RegionReplicaReplicationEndpoint.class.getName(), + // assert peer configuration is correct + peerConfig = admin.getReplicationPeerConfig(peerId); + assertNotNull(peerConfig); + assertEquals(peerConfig.getClusterKey(), + ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(RegionReplicaReplicationEndpoint.class.getName(), peerConfig.getReplicationEndpointImpl()); - admin.close(); + } } @Test public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception { // modify a table by adding region replicas. Check whether the replication peer is created // and replication started. - Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin(); - String peerId = "region_replica_replication"; + try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration()); + Admin admin = connection.getAdmin()) { + String peerId = "region_replica_replication"; + + ReplicationPeerConfig peerConfig = null; + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + } catch (ReplicationPeerNotFoundException e) { + LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); + } - ReplicationPeerConfig peerConfig = null; - try { - peerConfig = admin.getReplicationPeerConfig(peerId); - } catch (ReplicationPeerNotFoundException e) { - LOG.warn("Region replica replication peer id=" + peerId + " not exist", e); - } + if (peerConfig != null) { + admin.removeReplicationPeer(peerId); + peerConfig = null; + } - if (peerConfig != null) { - admin.removeReplicationPeer(peerId); - peerConfig = null; - } + HTableDescriptor htd = HTU.createTableDescriptor( + TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"), + HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, + HColumnDescriptor.DEFAULT_KEEP_DELETED); + HTU.getAdmin().createTable(htd); - HTableDescriptor htd = HTU.createTableDescriptor( - TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"), - HColumnDescriptor.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER, - HColumnDescriptor.DEFAULT_KEEP_DELETED); - HTU.getAdmin().createTable(htd); + // assert that replication peer is not created yet + try { + peerConfig = admin.getReplicationPeerConfig(peerId); + fail("Should throw ReplicationException, because replication peer id=" + peerId + + " not exist"); + } catch (ReplicationPeerNotFoundException e) { + } + assertNull(peerConfig); - // assert that replication peer is not created yet - try { + HTU.getAdmin().disableTable(htd.getTableName()); + htd.setRegionReplication(2); + HTU.getAdmin().modifyTable(htd); + HTU.getAdmin().enableTable(htd.getTableName()); + + // assert peer configuration is correct peerConfig = admin.getReplicationPeerConfig(peerId); - fail("Should throw ReplicationException, because replication peer id=" + peerId - + " not exist"); - } catch (ReplicationPeerNotFoundException e) { - } - assertNull(peerConfig); - - HTU.getAdmin().disableTable(htd.getTableName()); - htd.setRegionReplication(2); - HTU.getAdmin().modifyTable(htd); - HTU.getAdmin().enableTable(htd.getTableName()); - - // assert peer configuration is correct - peerConfig = admin.getReplicationPeerConfig(peerId); - assertNotNull(peerConfig); - assertEquals(peerConfig.getClusterKey(), ZKConfig.getZooKeeperClusterKey( - HTU.getConfiguration())); - assertEquals(RegionReplicaReplicationEndpoint.class.getName(), + assertNotNull(peerConfig); + assertEquals(peerConfig.getClusterKey(), + ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration())); + assertEquals(RegionReplicaReplicationEndpoint.class.getName(), peerConfig.getReplicationEndpointImpl()); - admin.close(); + } } public void testRegionReplicaReplication(int regionReplication) throws Exception {