diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index 7d59984540e2..f58dfba405a8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED; +import static org.apache.hadoop.hbase.HConstants.STATUS_PUBLISHED_DEFAULT; +import static org.apache.hadoop.hbase.client.ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS; +import static org.apache.hadoop.hbase.client.ClusterStatusListener.STATUS_LISTENER_CLASS; import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR; import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey; import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY; @@ -112,6 +116,8 @@ class AsyncConnectionImpl implements AsyncConnection { private final Optional metrics; + private final ClusterStatusListener clusterStatusListener; + public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId, User user) { this.conf = conf; @@ -140,6 +146,31 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl } this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf)); this.backoffPolicy = ClientBackoffPolicyFactory.create(conf); + ClusterStatusListener listener = null; + if (conf.getBoolean(STATUS_PUBLISHED, STATUS_PUBLISHED_DEFAULT)) { + // TODO: this maybe a blocking operation, better to create it outside the constructor and pass + // it in, just like clusterId. Not a big problem for now as the default value is false. + Class listenerClass = conf.getClass( + STATUS_LISTENER_CLASS, DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); + if (listenerClass == null) { + LOG.warn("{} is true, but {} is not set", STATUS_PUBLISHED, STATUS_LISTENER_CLASS); + } else { + try { + listener = new ClusterStatusListener( + new ClusterStatusListener.DeadServerHandler() { + @Override + public void newDead(ServerName sn) { + locator.clearCache(sn); + rpcClient.cancelConnections(sn); + } + }, conf, listenerClass); + } catch (IOException e) { + LOG.warn("Failed to create ClusterStatusListener, not a critical problem, ignoring...", + e); + } + } + } + this.clusterStatusListener = listener; } private void spawnRenewalChore(final UserGroupInformation user) { @@ -159,6 +190,7 @@ public void close() { if (closed) { return; } + IOUtils.closeQuietly(clusterStatusListener); IOUtils.closeQuietly(rpcClient); IOUtils.closeQuietly(registry); if (authService != null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java index 54bf9ff38f21..9cf8bc651d66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; /** @@ -113,4 +114,23 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { void clearCache() { metaRegionLocations.set(null); } + + void clearCache(ServerName serverName) { + for (;;) { + RegionLocations locs = metaRegionLocations.get(); + if (locs == null) { + return; + } + RegionLocations newLocs = locs.removeByServer(serverName); + if (locs == newLocs) { + return; + } + if (newLocs.isEmpty()) { + newLocs = null; + } + if (metaRegionLocations.compareAndSet(locs, newLocs)) { + return; + } + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java index 069a324fa949..fd8fcdb3d4a3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Scan.ReadType; @@ -617,6 +618,24 @@ void clearCache() { cache.clear(); } + void clearCache(ServerName serverName) { + for (TableCache tableCache : cache.values()) { + for (Map.Entry entry : tableCache.cache.entrySet()) { + byte[] regionName = entry.getKey(); + RegionLocations locs = entry.getValue(); + RegionLocations newLocs = locs.removeByServer(serverName); + if (locs == newLocs) { + continue; + } + if (newLocs.isEmpty()) { + tableCache.cache.remove(regionName, locs); + } else { + tableCache.cache.replace(regionName, locs, newLocs); + } + } + } + } + // only used for testing whether we have cached the location for a region. @VisibleForTesting RegionLocations getRegionLocationInCache(TableName tableName, byte[] row) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java index 3eb44b71142c..9e1d5e8f5a65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.util.Bytes; @@ -33,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; @@ -46,11 +48,14 @@ class AsyncRegionLocator { private final HashedWheelTimer retryTimer; + private final AsyncConnectionImpl conn; + private final AsyncMetaRegionLocator metaRegionLocator; private final AsyncNonMetaRegionLocator nonMetaRegionLocator; AsyncRegionLocator(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) { + this.conn = conn; this.metaRegionLocator = new AsyncMetaRegionLocator(conn.registry); this.nonMetaRegionLocator = new AsyncNonMetaRegionLocator(conn); this.retryTimer = retryTimer; @@ -150,9 +155,7 @@ void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) { } void clearCache(TableName tableName) { - if (LOG.isDebugEnabled()) { - LOG.debug("Clear meta cache for " + tableName); - } + LOG.debug("Clear meta cache for {}", tableName); if (tableName.equals(META_TABLE_NAME)) { metaRegionLocator.clearCache(); } else { @@ -160,8 +163,20 @@ void clearCache(TableName tableName) { } } + void clearCache(ServerName serverName) { + LOG.debug("Clear meta cache for {}", serverName); + metaRegionLocator.clearCache(serverName); + nonMetaRegionLocator.clearCache(serverName); + conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer); + } + void clearCache() { metaRegionLocator.clearCache(); nonMetaRegionLocator.clearCache(); } + + @VisibleForTesting + AsyncNonMetaRegionLocator getNonMetaRegionLocator() { + return nonMetaRegionLocator; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java new file mode 100644 index 000000000000..849feb8f2a70 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRSCrashPublish.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertNotEquals; + +import java.io.IOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncTableRSCrashPublish { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRSCrashPublish.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static AsyncConnectionImpl CONN; + + private static TableName TABLE_NAME = TableName.valueOf("Publish"); + + private static byte[] FAMILY = Bytes.toBytes("family"); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); + UTIL.startMiniCluster(2); + UTIL.createTable(TABLE_NAME, FAMILY); + UTIL.waitTableAvailable(TABLE_NAME); + CONN = + (AsyncConnectionImpl) ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDown() throws Exception { + Closeables.close(CONN, true); + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws IOException { + AsyncNonMetaRegionLocator locator = CONN.getLocator().getNonMetaRegionLocator(); + CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join(); + ServerName serverName = locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW) + .getDefaultRegionLocation().getServerName(); + UTIL.getMiniHBaseCluster().stopRegionServer(serverName); + UTIL.waitFor(60000, + () -> locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW) == null); + CONN.getTable(TABLE_NAME).get(new Get(Bytes.toBytes(0))).join(); + assertNotEquals(serverName, + locator.getRegionLocationInCache(TABLE_NAME, HConstants.EMPTY_START_ROW) + .getDefaultRegionLocation().getServerName()); + } +}