Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24956 : ConnectionManager#locateRegionInMeta waits for user region lock indef… #2427

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class ConnectionConfiguration {
// toggle for async/sync prefetch
private final boolean clientScannerAsyncPrefetch;

/**
/**
* Constructor
* @param conf Configuration object
*/
Expand Down Expand Up @@ -208,5 +208,4 @@ public boolean isClientScannerAsyncPrefetch() {
public int getRpcTimeout() {
return rpcTimeout;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -865,13 +865,15 @@ private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, bool
}
// Query the meta region
long pauseBase = this.pause;
userRegionLock.lock();
takeUserRegionLock();
try {
if (useCache) {// re-check cache after get lock
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
// We don't need to check if useCache is enabled or not. Even if useCache is false
// we already cleared the cache for this row before acquiring userRegion lock so if this
// row is present in cache that means some other thread has populated it while we were
// waiting to acquire user region lock.
RegionLocations locations = getCachedLocation(tableName, row);
if (locations != null && locations.getRegionLocation(replicaId) != null) {
return locations;
}
if (relocateMeta) {
relocateRegion(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
Expand All @@ -894,7 +896,7 @@ rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSeco
}
tableNotFound = false;
// convert the row result into the HRegionLocation we need!
RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
if (locations == null || locations.getRegionLocation(replicaId) == null) {
throw new IOException("RegionInfo null in " + tableName + ", row=" + regionInfoRow);
}
Expand Down Expand Up @@ -970,6 +972,19 @@ rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSeco
}
}

void takeUserRegionLock() throws IOException {
try {
long waitTime = connectionConfig.getMetaOperationTimeout();
if (!userRegionLock.tryLock(waitTime, TimeUnit.MILLISECONDS)) {
throw new LockTimeoutException("Failed to get user region lock in"
+ waitTime + " ms. " + " for accessing meta region server.");
}
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for a lock", ie);
throw ExceptionUtil.asInterrupt(ie);
}
}

/**
* Put a newly discovered HRegionLocation into the cache.
* @param tableName The table name.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.yetus.audience.InterfaceAudience;

/*
Thrown whenever we are not able to get the lock within the specified wait time.
*/
@InterfaceAudience.Public
public class LockTimeoutException extends HBaseIOException {
public LockTimeoutException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({MediumTests.class, ClientTests.class})
public class TestMetaCache {
Expand All @@ -70,8 +72,8 @@ public class TestMetaCache {
private static final TableName TABLE_NAME = TableName.valueOf("test_table");
private static final byte[] FAMILY = Bytes.toBytes("fam1");
private static final byte[] QUALIFIER = Bytes.toBytes("qual");

private static HRegionServer badRS;
private static final Logger LOG = LoggerFactory.getLogger(TestMetaCache.class);

/**
* @throws java.lang.Exception
Expand Down Expand Up @@ -369,4 +371,77 @@ public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest
throws ServiceException {
}
}

@Test
public void testUserRegionLockThrowsException() throws IOException, InterruptedException {
((FakeRSRpcServices)badRS.getRSRpcServices()).setExceptionInjector(new LockSleepInjector());
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
conf.setLong(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 2000);
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 2000);

try (ConnectionImplementation conn =
(ConnectionImplementation) ConnectionFactory.createConnection(conf)) {
ClientThread client1 = new ClientThread(conn);
ClientThread client2 = new ClientThread(conn);
client1.start();
client2.start();
client1.join();
client2.join();
// One thread will get the lock but will sleep in LockExceptionInjector#throwOnScan and
// eventually fail since the sleep time is more than hbase client scanner timeout period.
// Other thread will wait to acquire userRegionLock.
// Have no idea which thread will be scheduled first. So need to check both threads.

// Both the threads will throw exception. One thread will throw exception since after
// acquiring user region lock, it is sleeping for 5 seconds when the scanner time out period
// is 2 seconds.
// Other thread will throw exception since it was not able to get hold of user region lock
// within meta operation timeout period.
assertNotNull(client1.getException());
assertNotNull(client2.getException());

assertTrue(client1.getException() instanceof LockTimeoutException
^ client2.getException() instanceof LockTimeoutException);
}
}

private final class ClientThread extends Thread {
private Exception exception;
private ConnectionImplementation connection;

private ClientThread(ConnectionImplementation connection) {
this.connection = connection;
}
@Override
public void run() {
byte[] currentKey = HConstants.EMPTY_START_ROW;
try {
connection.getRegionLocation(TABLE_NAME, currentKey, true);
} catch (IOException e) {
LOG.error("Thread id: " + this.getId() + " exception: ", e);
this.exception = e;
}
}
public Exception getException() {
return exception;
}
}

public static class LockSleepInjector extends ExceptionInjector {
@Override
public void throwOnScan(FakeRSRpcServices rpcServices, ClientProtos.ScanRequest request) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.info("Interrupted exception", e);
}
}

@Override
public void throwOnGet(FakeRSRpcServices rpcServices, ClientProtos.GetRequest request) { }

@Override
public void throwOnMutate(FakeRSRpcServices rpcServices, ClientProtos.MutateRequest request) { }
}
}