Skip to content

Commit

Permalink
HBASE-22261 Make use of ClusterStatusListener for async client
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 20, 2019
1 parent 353f922 commit 3637bbb
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED;
import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT;
import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS;
import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
Expand Down Expand Up @@ -107,6 +111,8 @@ class AsyncConnectionImpl implements AsyncConnection {

private final Optional<MetricsConnection> metrics;

private final ClusterStatusListener clusterStatusListener;

public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
User user) {
this.conf = conf;
Expand All @@ -133,6 +139,31 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
} else {
nonceGenerator = NO_NONCE_GENERATOR;
}
ClusterStatusListener listener = null;
if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) {
// XXX: this maybe a blocking operation, better to create it outside the constructor and pass
// it in, just like clusterId. Not a big problem for now as the default value is false.
Class<? extends ClusterStatusListener.Listener> listenerClass = conf.getClass(
STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
if (listenerClass == null) {
LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS);
} else {
try {
listener = new ClusterStatusListener(
new ClusterStatusListener.DeadServerHandler() {
@Override
public void newDead(ServerName sn) {
locator.clearCache(sn);
rpcClient.cancelConnections(sn);
}
}, conf, listenerClass);
} catch (IOException e) {
LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...",
e);
}
}
}
this.clusterStatusListener = listener;
}

private void spawnRenewalChore(final UserGroupInformation user) {
Expand All @@ -152,6 +183,7 @@ public void close() {
if (closed) {
return;
}
IOUtils.closeQuietly(clusterStatusListener);
IOUtils.closeQuietly(rpcClient);
IOUtils.closeQuietly(registry);
if (authService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -113,4 +114,23 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
void clearCache() {
metaRegionLocations.set(null);
}

void clearCache(ServerName serverName) {
for (;;) {
RegionLocations locs = metaRegionLocations.get();
if (locs == null) {
return;
}
RegionLocations newLocs = locs.removeByServer(serverName);
if (locs == newLocs) {
return;
}
if (newLocs.isEmpty()) {
newLocs = null;
}
if (metaRegionLocations.compareAndSet(locs, newLocs)) {
return;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Scan.ReadType;
Expand Down Expand Up @@ -617,6 +618,24 @@ void clearCache() {
cache.clear();
}

void clearCache(ServerName serverName) {
for (TableCache tableCache : cache.values()) {
for (Map.Entry<byte[], RegionLocations> entry : tableCache.cache.entrySet()) {
byte[] regionName = entry.getKey();
RegionLocations locs = entry.getValue();
RegionLocations newLocs = locs.removeByServer(serverName);
if (locs == newLocs) {
continue;
}
if (newLocs.isEmpty()) {
tableCache.cache.remove(regionName, locs);
} else {
tableCache.cache.replace(regionName, locs, newLocs);
}
}
}
}

// only used for testing whether we have cached the location for a region.
@VisibleForTesting
RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.util.Bytes;
Expand All @@ -33,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;

Expand All @@ -46,11 +48,14 @@ class AsyncRegionLocator {

private final HashedWheelTimer retryTimer;

private final AsyncConnectionImpl conn;

private final AsyncMetaRegionLocator metaRegionLocator;

private final AsyncNonMetaRegionLocator nonMetaRegionLocator;

AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
this.conn = conn;
this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry);
this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn);
this.retryTimer = retryTimer;
Expand Down Expand Up @@ -150,18 +155,28 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
}

void clearCache(TableName tableName) {
if (LOG.isDebugEnabled()) {
LOG.debug("Clear meta cache for " + tableName);
}
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
metaRegionLocator.clearCache();
} else {
nonMetaRegionLocator.clearCache(tableName);
}
}

void clearCache(ServerName serverName) {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
}

void clearCache() {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
}

@VisibleForTesting
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
return nonMetaRegionLocator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.*;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.hbase.thirdparty.com.google.common.io.Closeables;

@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTableRSCrashPublish {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRSCrashPublish.class);

private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

private static AsyncConnectionImpl CONN;

private static TableName TABLE_NAME = TableName.valueOf("Publish");

private static byte[] FAMILY = Bytes.toBytes("family");

@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
UTIL.startMiniCluster(2);
UTIL.createTable(TABLE_NAME, FAMILY);
UTIL.waitTableAvailable(TABLE_NAME);
CONN =
(AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
}

@AfterClass
public static void tearDown() throws Exception {
Closeables.close(CONN, true);
UTIL.shutdownMiniCluster();
}

@Test
public void test() throws IOException {
AsyncNonMetaRegionLocator locator = CONN.getLocator().getNonMetaRegionLocator();
CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
ServerName serverName = locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
.getDefaultRegionLocation().getServerName();
UTIL.getMiniHBaseCluster().stopRegionServer(serverName);
UTIL.waitFor(60000,
() -> locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW) == null);
CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join();
assertNotEquals(serverName,
locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW)
.getDefaultRegionLocation().getServerName());
}
}

0 comments on commit 3637bbb

Please sign in to comment.