diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index 53859c2ac6a2..e809c4c5eb25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -63,7 +63,7 @@ public class ConnectionConfiguration { // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; - /** + /** * Constructor * @param conf Configuration object */ @@ -208,5 +208,4 @@ public boolean isClientScannerAsyncPrefetch() { public int getRpcTimeout() { return rpcTimeout; } - } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 06e243bf4c45..d5620b57d300 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -865,13 +865,15 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool } // Query the meta region long pauseBase = this.pause; - userRegionLock.lock(); + takeUserRegionLock(); try { - if (useCache) {// re-check cache after get lock - RegionLocations locations = getCachedLocation(tableName, row); - if (locations != null && locations.getRegionLocation(replicaId) != null) { - return locations; - } + // We don't need to check if useCache is enabled or not. Even if useCache is false + // we already cleared the cache for this row before acquiring userRegion lock so if this + // row is present in cache that means some other thread has populated it while we were + // waiting to acquire user region lock. + RegionLocations locations = getCachedLocation(tableName, row); + if (locations != null && locations.getRegionLocation(replicaId) != null) { + return locations; } if (relocateMeta) { relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW, @@ -894,7 +896,7 @@ rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSeco } tableNotFound = false; // convert the row result into the HRegionLocation we need! - RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow); + locations = MetaTableAccessor.getRegionLocations(regionInfoRow); if (locations == null || locations.getRegionLocation(replicaId) == null) { throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow); } @@ -970,6 +972,19 @@ rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSeco } } + void takeUserRegionLock() throws IOException { + try { + long waitTime = connectionConfig.getMetaOperationTimeout(); + if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) { + throw new LockTimeoutException("Failed to get user region lock in" + + waitTime + " ms. " + " for accessing meta region server."); + } + } catch (InterruptedException ie) { + LOG.error("Interrupted while waiting for a lock", ie); + throw ExceptionUtil.asInterrupt(ie); + } + } + /** * Put a newly discovered HRegionLocation into the cache. * @param tableName The table name. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LockTimeoutException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LockTimeoutException.java new file mode 100644 index 000000000000..b949f0e2ecb9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/LockTimeoutException.java @@ -0,0 +1,32 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.hbase.HBaseIOException; +import org.apache.yetus.audience.InterfaceAudience; + +/* + Thrown whenever we are not able to get the lock within the specified wait time. + */ +@InterfaceAudience.Public +public class LockTimeoutException extends HBaseIOException { + public LockTimeoutException(String message) { + super(message); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java index 3870244f4cf1..9dc896832493 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaCache.java @@ -58,6 +58,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Category({MediumTests.class, ClientTests.class}) public class TestMetaCache { @@ -70,8 +72,8 @@ public class TestMetaCache { private static final TableName TABLE_NAME = TableName.valueOf("test_table"); private static final byte[] FAMILY = Bytes.toBytes("fam1"); private static final byte[] QUALIFIER = Bytes.toBytes("qual"); - private static HRegionServer badRS; + private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class); /** * @throws java.lang.Exception @@ -369,4 +371,77 @@ public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest throws ServiceException { } } + + @Test + public void testUserRegionLockThrowsException() throws IOException, InterruptedException { + ((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector()); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000); + + try (ConnectionImplementation conn = + (ConnectionImplementation) ConnectionFactory.createConnection(conf)) { + ClientThread client1 = new ClientThread(conn); + ClientThread client2 = new ClientThread(conn); + client1.start(); + client2.start(); + client1.join(); + client2.join(); + // One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and + // eventually fail since the sleep time is more than hbase client scanner timeout period. + // Other thread will wait to acquire userRegionLock. + // Have no idea which thread will be scheduled first. So need to check both threads. + + // Both the threads will throw exception. One thread will throw exception since after + // acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period + // is 2 seconds. + // Other thread will throw exception since it was not able to get hold of user region lock + // within meta operation timeout period. + assertNotNull(client1.getException()); + assertNotNull(client2.getException()); + + assertTrue(client1.getException() instanceof LockTimeoutException + ^ client2.getException() instanceof LockTimeoutException); + } + } + + private final class ClientThread extends Thread { + private Exception exception; + private ConnectionImplementation connection; + + private ClientThread(ConnectionImplementation connection) { + this.connection = connection; + } + @Override + public void run() { + byte[] currentKey = HConstants.EMPTY_START_ROW; + try { + connection.getRegionLocation(TABLE_NAME, currentKey, true); + } catch (IOException e) { + LOG.error("Thread id: " + this.getId() + " exception: ", e); + this.exception = e; + } + } + public Exception getException() { + return exception; + } + } + + public static class LockSleepInjector extends ExceptionInjector { + @Override + public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + LOG.info("Interrupted exception", e); + } + } + + @Override + public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { } + + @Override + public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { } + } }