Skip to content

Commit

Permalink
HBASE-24788: Fix the connection leaks on getting hbase admin from unc…
Browse files Browse the repository at this point in the history
…losed connection (#2162)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
sandeepvinayak authored Jul 29, 2020
1 parent 5f27a00 commit d65fb87
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.mapreduce;

import java.io.IOException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand All @@ -41,6 +38,9 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
Expand Down Expand Up @@ -155,11 +155,10 @@ public void write(KEY key, Mutation value)
* @param context The current task context.
* @return The newly created writer instance.
* @throws IOException When creating the writer fails.
* @throws InterruptedException When the jobs is cancelled.
*/
@Override
public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException {
throws IOException {
return new TableRecordWriter();
}

Expand All @@ -168,18 +167,18 @@ public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
*
* @param context The current context.
* @throws IOException When the check fails.
* @throws InterruptedException When the job is aborted.
* @see OutputFormat#checkOutputSpecs(JobContext)
*/
@Override
public void checkOutputSpecs(JobContext context) throws IOException,
InterruptedException {
public void checkOutputSpecs(JobContext context)
throws IOException {
Configuration hConf = getConf();
if (hConf == null) {
hConf = context.getConfiguration();
}

try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
try (Connection connection = ConnectionFactory.createConnection(hConf);
Admin admin = connection.getAdmin()) {
TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Can't write, table does not exist:" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
Expand Down Expand Up @@ -161,25 +162,26 @@ public static void setupRegionReplicaReplication(Configuration conf) throws IOEx
if (!isRegionReplicaReplicationEnabled(conf)) {
return;
}
Admin admin = ConnectionFactory.createConnection(conf).getAdmin();
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
+ " not exist", e);
}
try {

try (Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin()) {
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn(
"Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist",
e);
}

if (peerConfig == null) {
LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
+ " not exist. Creating...");
+ " not exist. Creating...");
peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
}
} finally {
admin.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -41,6 +42,7 @@
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -64,6 +66,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
private static HBaseTestingUtility TEST_UTIL2;
private static Configuration conf2;
private static AsyncAdmin admin2;
private static AsyncConnection connection;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
Expand All @@ -78,14 +81,21 @@ public static void setUpBeforeClass() throws Exception {
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
TEST_UTIL2 = new HBaseTestingUtility(conf2);
TEST_UTIL2.startMiniCluster();
admin2 =
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();

connection =
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
admin2 = connection.getAdmin();

ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(TEST_UTIL2.getClusterKey());
ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
}

@AfterClass
public static void clearUp() throws IOException {
connection.close();
}

@Override
@After
public void tearDown() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,12 +396,12 @@ public void testReplicaAndReplication() throws Exception {
LOG.info("Setup second Zk");
HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);

Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin();

ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(HTU2.getClusterKey());
admin.addReplicationPeer("2", rpc);
admin.close();
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(HTU2.getClusterKey());
admin.addReplicationPeer("2", rpc);
}

Put p = new Put(row);
p.addColumn(row, row, row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
Expand Down Expand Up @@ -149,27 +150,29 @@ public void testClearBlockCacheFromAdmin() throws Exception {

@Test
public void testClearBlockCacheFromAsyncAdmin() throws Exception {
AsyncAdmin admin =
ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin();

BlockCache blockCache1 = rs1.getBlockCache().get();
BlockCache blockCache2 = rs2.getBlockCache().get();
long initialBlockCount1 = blockCache1.getBlockCount();
long initialBlockCount2 = blockCache2.getBlockCount();

// scan will cause blocks to be added in BlockCache
scanAllRegionsForRS(rs1);
assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(HTU.getConfiguration())
.get()) {
AsyncAdmin admin = conn.getAdmin();

BlockCache blockCache1 = rs1.getBlockCache().get();
BlockCache blockCache2 = rs2.getBlockCache().get();
long initialBlockCount1 = blockCache1.getBlockCount();
long initialBlockCount2 = blockCache2.getBlockCount();

// scan will cause blocks to be added in BlockCache
scanAllRegionsForRS(rs1);
assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
scanAllRegionsForRS(rs2);
assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
scanAllRegionsForRS(rs2);
assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));

CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
assertEquals(initialBlockCount1, blockCache1.getBlockCount());
assertEquals(initialBlockCount2, blockCache2.getBlockCount());
assertEquals(initialBlockCount1, blockCache1.getBlockCount());
assertEquals(initialBlockCount2, blockCache2.getBlockCount());
}
}

private void scanAllRegionsForRS(HRegionServer rs) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
Expand Down Expand Up @@ -486,17 +487,17 @@ private void createTableOnClusters(TableDescriptor table) throws Exception {

private void addPeer(String id, int masterClusterNumber,
int slaveClusterNumber) throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
Admin admin = conn.getAdmin()) {
admin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
}
}

private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception {
try (Admin admin =
ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
Admin admin = conn.getAdmin()) {
admin.addReplicationPeer(
id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
Expand All @@ -506,15 +507,15 @@ private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber,
}

private void disablePeer(String id, int masterClusterNumber) throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
Admin admin = conn.getAdmin()) {
admin.disableReplicationPeer(id);
}
}

private void enablePeer(String id, int masterClusterNumber) throws Exception {
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
Admin admin = conn.getAdmin()) {
admin.enableReplicationPeer(id);
}
}
Expand Down
Loading

0 comments on commit d65fb87

Please sign in to comment.