Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26783 ScannerCallable doubly clears meta cache on retries #4168

Merged
merged 1 commit into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.createCloseRowBefore;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStartRow;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;


/**
Expand All @@ -43,6 +41,8 @@
@InterfaceAudience.Private
public class ReversedScannerCallable extends ScannerCallable {

private byte[] locationSearchKey;

/**
* @param connection
* @param tableName
Expand Down Expand Up @@ -70,6 +70,18 @@ public ReversedScannerCallable(ClusterConnection connection, TableName tableName
super(connection, tableName, scan, scanMetrics, rpcFactory, replicaId);
}

@Override
public void throwable(Throwable t, boolean retrying) {
// for reverse scans, we need to update cache using the search key found for the reverse scan
// range in prepare. Otherwise, we will see weird behavior at the table boundaries,
// when trying to clear cache for an empty row.
if (location != null && locationSearchKey != null) {
getConnection().updateCachedLocations(getTableName(),
location.getRegionInfo().getRegionName(),
locationSearchKey, t, location.getServerName());
}
}

/**
* @param reload force reload of server location
* @throws IOException
Expand All @@ -79,34 +91,37 @@ public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}

if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
&& getConnection().isTableDisabled(getTableName())) {
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
}

if (!instantiated || reload) {
// we should use range locate if
// 1. we do not want the start row
// 2. the start row is empty which means we need to locate to the last region.
if (scan.includeStartRow() && !isEmptyStartRow(getRow())) {
// Just locate the region with the row
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
getConnection(), getTableName(), getRow());
this.location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
throw new IOException("Failed to find location, tableName="
+ tableName + ", row=" + Bytes.toStringBinary(row) + ", reload="
+ reload);
}
RegionLocations rl = getRegionLocationsForPrepare(getRow());
this.location = getLocationForReplica(rl);
this.locationSearchKey = getRow();
} else {
// Need to locate the regions with the range, and the target location is
// the last one which is the previous region of last region scanner
// The locateStart row is an approximation. So we need to search between
// that and the actual row in order to really find the last region
byte[] locateStartRow = createCloseRowBefore(getRow());
List<HRegionLocation> locatedRegions = locateRegionsInRange(
locateStartRow, row, reload);
if (locatedRegions.isEmpty()) {
throw new DoNotRetryIOException(
"Does hbase:meta exist hole? Couldn't get regions for the range from "
+ Bytes.toStringBinary(locateStartRow) + " to "
+ Bytes.toStringBinary(row));
}
this.location = locatedRegions.get(locatedRegions.size() - 1);
Pair<HRegionLocation, byte[]> lastRegionAndKey = locateLastRegionInRange(
locateStartRow, getRow());
this.location = lastRegionAndKey.getFirst();
this.locationSearchKey = lastRegionAndKey.getSecond();
}

if (location == null || location.getServerName() == null) {
throw new IOException("Failed to find location, tableName="
+ getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload="
+ reload);
}

setStub(getConnection().getClient(getLocation().getServerName()));
checkIfRegionServerIsRemote();
instantiated = true;
Expand All @@ -124,33 +139,32 @@ public void prepare(boolean reload) throws IOException {
}

/**
* Get the corresponding regions for an arbitrary range of keys.
* Get the last region before the endkey, which will be used to execute the reverse scan
* @param startKey Starting row in range, inclusive
* @param endKey Ending row in range, exclusive
* @param reload force reload of server location
* @return A list of HRegionLocation corresponding to the regions that contain
* the specified range
* @throws IOException
* @return The last location, and the rowKey used to find it. May be null,
* if a region could not be found.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH",
justification="I thought I'd fixed it but FB still complains; see below")
private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
byte[] endKey, boolean reload) throws IOException {
private Pair<HRegionLocation, byte[]> locateLastRegionInRange(byte[] startKey, byte[] endKey)
throws IOException {
final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
HConstants.EMPTY_END_ROW);
if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
throw new IllegalArgumentException("Invalid range: "
+ Bytes.toStringBinary(startKey) + " > "
+ Bytes.toStringBinary(endKey));
}
List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();

HRegionLocation lastRegion = null;
byte[] lastFoundKey = null;
byte[] currentKey = startKey;

do {
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload, id,
getConnection(), getTableName(), currentKey);
HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null;
if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) {
regionList.add(regionLocation);
RegionLocations rl = getRegionLocationsForPrepare(currentKey);
HRegionLocation regionLocation = getLocationForReplica(rl);
if (regionLocation.getRegionInfo().containsRow(currentKey)) {
lastFoundKey = currentKey;
lastRegion = regionLocation;
} else {
// FindBugs: NP_NULL_ON_SOME_PATH Complaining about regionLocation
throw new DoNotRetryIOException("Does hbase:meta exist hole? Locating row "
Expand All @@ -160,7 +174,8 @@ private List<HRegionLocation> locateRegionsInRange(byte[] startKey,
currentKey = regionLocation.getRegionInfo().getEndKey();
} while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW)
&& (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
return regionList;

return new Pair<>(lastRegion, lastFoundKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
Expand Down Expand Up @@ -147,6 +148,32 @@ HBaseRpcController getController() {
return controller;
}

protected final HRegionLocation getLocationForReplica(RegionLocations locs)
throws HBaseIOException {
HRegionLocation loc = id < locs.size() ? locs.getRegionLocation(id) : null;
if (loc == null || loc.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);
}
return loc;
}

/**
* Fetch region locations for the row. Since this is for prepare, we always useCache.
* This is because we can be sure that RpcRetryingCaller will have cleared the cache
* in error handling if this is a retry.
*
* @param row the row to look up region location for
*/
protected final RegionLocations getRegionLocationsForPrepare(byte[] row)
throws IOException {
// always use cache, because cache will have been cleared if necessary
// in the try/catch before retrying
return RpcRetryingCallerWithReadReplicas.getRegionLocations(true, id, getConnection(),
getTableName(), row);
}

/**
* @param reload force reload of server location
* @throws IOException
Expand All @@ -156,14 +183,14 @@ public void prepare(boolean reload) throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException();
}
RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(!reload,
id, getConnection(), getTableName(), getRow());
location = id < rl.size() ? rl.getRegionLocation(id) : null;
if (location == null || location.getServerName() == null) {
// With this exception, there will be a retry. The location can be null for a replica
// when the table is created or after a split.
throw new HBaseIOException("There is no location for replica id #" + id);

if (reload && getTableName() != null && !getTableName().equals(TableName.META_TABLE_NAME)
&& getConnection().isTableDisabled(getTableName())) {
throw new TableNotEnabledException(getTableName().getNameAsString() + " is disabled.");
}

RegionLocations rl = getRegionLocationsForPrepare(getRow());
location = getLocationForReplica(rl);
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
if (!instantiated || reload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
Expand All @@ -32,64 +42,89 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
@Category({ ClientTests.class, SmallTests.class })
public class TestReversedScannerCallable {

private static final TableName TABLE_NAME = TableName.valueOf("TestReversedScannerCallable");

private static final String HOSTNAME = "localhost";
private static final ServerName SERVERNAME = ServerName.valueOf(HOSTNAME, 60030, 123);
private static final byte[] ROW = Bytes.toBytes("row1");
private static final Scan DEFAULT_SCAN = new Scan().withStartRow(ROW, true).setReversed(true);

@Mock
private ClusterConnection connection;
@Mock
private Scan scan;
@Mock
private RpcControllerFactory rpcFactory;
@Mock
private RegionLocations regionLocations;

private final byte[] ROW = Bytes.toBytes("row1");
@Mock
private HRegionLocation regionLocation;

@Before
public void setUp() throws Exception {
byte[] ROW_BEFORE = ConnectionUtils.createCloseRowBefore(ROW);
HRegionLocation regionLocation = Mockito.mock(HRegionLocation.class);
ServerName serverName = Mockito.mock(ServerName.class);
HRegionInfo regionInfo = Mockito.mock(HRegionInfo.class);

Mockito.when(connection.getConfiguration()).thenReturn(new Configuration());
Mockito.when(regionLocations.size()).thenReturn(1);
Mockito.when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
Mockito.when(regionLocation.getHostname()).thenReturn("localhost");
Mockito.when(regionLocation.getRegionInfo()).thenReturn(regionInfo);
Mockito.when(regionLocation.getServerName()).thenReturn(serverName);
Mockito.when(regionInfo.containsRow(ROW_BEFORE)).thenReturn(true);
Mockito.when(scan.includeStartRow()).thenReturn(true);
Mockito.when(scan.getStartRow()).thenReturn(ROW);
when(connection.getConfiguration()).thenReturn(new Configuration());
when(regionLocations.size()).thenReturn(1);
when(regionLocations.getRegionLocation(0)).thenReturn(regionLocation);
when(regionLocation.getHostname()).thenReturn(HOSTNAME);
when(regionLocation.getServerName()).thenReturn(SERVERNAME);
}

@Test
public void testPrepareDoesNotUseCache() throws Exception {
TableName tableName = TableName.valueOf("MyTable");
Mockito.when(connection.relocateRegion(tableName, ROW, 0)).thenReturn(regionLocations);
public void testPrepareAlwaysUsesCache() throws Exception {
when(connection.locateRegion(TABLE_NAME, ROW, true, true, 0))
.thenReturn(regionLocations);

ReversedScannerCallable callable =
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory);
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);
callable.prepare(false);
callable.prepare(true);

Mockito.verify(connection).relocateRegion(tableName, ROW, 0);
verify(connection, times(2)).locateRegion(TABLE_NAME, ROW, true, true, 0);
}

@Test
public void testHandleDisabledTable() throws IOException {
when(connection.isTableDisabled(TABLE_NAME)).thenReturn(true);

ReversedScannerCallable callable =
new ReversedScannerCallable(connection, TABLE_NAME, DEFAULT_SCAN, null, rpcFactory, 0);

try {
callable.prepare(true);
fail("should have thrown TableNotEnabledException");
} catch (TableNotEnabledException e) {
// pass
}
}

@Test
public void testPrepareUsesCache() throws Exception {
TableName tableName = TableName.valueOf("MyTable");
Mockito.when(connection.locateRegion(tableName, ROW, true, true, 0))
public void testUpdateSearchKeyCacheLocation() throws IOException {
byte[] regionName = HRegionInfo.createRegionName(TABLE_NAME,
ConnectionUtils.createCloseRowBefore(ConnectionUtils.MAX_BYTE_ARRAY), "123", false);
HRegionInfo mockRegionInfo = mock(HRegionInfo.class);
when(mockRegionInfo.containsRow(ConnectionUtils.MAX_BYTE_ARRAY)).thenReturn(true);
when(mockRegionInfo.getEndKey()).thenReturn(HConstants.EMPTY_END_ROW);
when(mockRegionInfo.getRegionName()).thenReturn(regionName);
when(regionLocation.getRegionInfo()).thenReturn(mockRegionInfo);

IOException testThrowable = new IOException("test throwable");

when(connection.locateRegion(TABLE_NAME, ConnectionUtils.MAX_BYTE_ARRAY, true, true, 0))
.thenReturn(regionLocations);

Scan scan = new Scan().setReversed(true);
ReversedScannerCallable callable =
new ReversedScannerCallable(connection, tableName, scan, null, rpcFactory);
new ReversedScannerCallable(connection, TABLE_NAME, scan, null, rpcFactory, 0);

callable.prepare(false);

Mockito.verify(connection).locateRegion(tableName, ROW, true, true, 0);
callable.throwable(testThrowable, true);

verify(connection).updateCachedLocations(TABLE_NAME, regionName,
ConnectionUtils.MAX_BYTE_ARRAY, testThrowable, SERVERNAME);
}
}
Loading