Skip to content

Commit

Permalink
Merge branch 'apache:HBASE-26233' into HBASE-26233
Browse files Browse the repository at this point in the history
  • Loading branch information
lshangq authored Nov 26, 2021
2 parents c30861a + e44ddde commit b863b1e
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ public void testRegionReplicationOnMidClusterWithRacks() {

@Test
public void testRegionReplicationOnLargeClusterWithRacks() {
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 1000); // 10 sec
conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 10 sec
loadBalancer.onConfigurationChange(conf);
int numNodes = 100;
int numRegions = numNodes * 30;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,11 @@ public long getWALPosition(ServerName serverName, String queueId, String fileNam
return 0;
}

/**
* This implement must update the cversion of root {@link #queuesZNode}. The optimistic lock of
* the {@link #getAllWALs()} method is based on the cversion of root {@link #queuesZNode}.
* @see #getAllWALs() to show the usage of the cversion of root {@link #queuesZNode} .
*/
@Override
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException {
Expand Down Expand Up @@ -417,6 +422,12 @@ public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, S
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
// Append new queue id for prevent lock competition in zookeeper server.
String claimLockZNode = ZNodePaths.joinZNode(queuesZNode, "cversion_" + newQueueId);
// A trick for update the cversion of root queuesZNode .
// The optimistic lock of the getAllWALs() method is based on the cversion of root queuesZNode
listOfOps.add(ZKUtilOp.createAndFailSilent(claimLockZNode, HConstants.EMPTY_BYTE_ARRAY));
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(claimLockZNode));

LOG.trace("The multi list size is {}", listOfOps.size());
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
Expand Down Expand Up @@ -505,6 +516,13 @@ protected int getQueuesZNodeCversion() throws KeeperException {
return stat.getCversion();
}

/**
* The optimistic lock of this implement is based on the cversion of root {@link #queuesZNode}.
* Therefore, we must update the cversion of root {@link #queuesZNode} when migrate wal nodes to
* other queues.
* @see #claimQueue(ServerName, String, ServerName) as an example of updating root
* {@link #queuesZNode} cversion.
*/
@Override
public Set<String> getAllWALs() throws ReplicationException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,29 @@ public void testAddRemoveLog() throws ReplicationException {
}
}

// For HBASE-12865
// For HBASE-12865, HBASE-26482
@Test
public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
STORAGE.addWAL(serverName1, "1", "file");
STORAGE.addWAL(serverName1, "2", "file");

int v0 = STORAGE.getQueuesZNodeCversion();
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
// Avoid claimQueue update cversion for prepare server2 rsNode.
STORAGE.addWAL(serverName2, "1", "file");
STORAGE.addWAL(serverName2, "2", "file");

int v0 = STORAGE.getQueuesZNodeCversion();

STORAGE.claimQueue(serverName1, "1", serverName2);
int v1 = STORAGE.getQueuesZNodeCversion();
// cversion should increase by 1 since a child node is deleted
assertEquals(1, v1 - v0);
// cversion should be increased by claimQueue method.
assertTrue(v1 > v0);

STORAGE.claimQueue(serverName1, "2", serverName2);
int v2 = STORAGE.getQueuesZNodeCversion();
// cversion should be increased by claimQueue method.
assertTrue(v2 > v1);
}

private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ public long timeOfOldestEdit() {
}

/**
* This method is protected under {@link HStore#lock} write lock,<br/>
* and this method is used by {@link HStore#updateStorefiles} after flushing is completed.<br/>
* The passed snapshot was successfully persisted; it can be let go.
* @param id Id of the snapshot to clean out.
* @see MemStore#snapshot()
Expand All @@ -245,6 +247,10 @@ public void clearSnapshot(long id) throws UnexpectedStateException {
}
// OK. Passed in snapshot is same as current snapshot. If not-empty,
// create a new snapshot and let the old one go.
doClearSnapShot();
}

protected void doClearSnapShot() {
Segment oldSnapshot = this.snapshot;
if (!this.snapshot.isEmpty()) {
this.snapshot = SegmentFactory.instance().createImmutableSegment(this.comparator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ public String getFamilyName() {
return Bytes.toString(getFamilyNameInBytes());
}

/**
* This method is protected under {@link HStore#lock} read lock.
*/
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = getActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,21 @@ protected long heapSize() {
}

@Override
/*
/**
* This method is protected under {@link HStore#lock} read lock. <br/>
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<KeyValueScanner> list = new ArrayList<>();
addToScanners(getActive(), readPt, list);
addToScanners(snapshot.getAllSegments(), readPt, list);
addToScanners(getSnapshotSegments(), readPt, list);
return list;
}

protected List<Segment> getSnapshotSegments() {
return snapshot.getAllSegments();
}

@Override
protected List<Segment> getSegments() throws IOException {
List<Segment> list = new ArrayList<>(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;

private static final Logger LOG = LoggerFactory.getLogger(HStore.class);

protected final MemStore memstore;
/**
* TODO:After making the {@link DefaultMemStore} extensible in {@link HStore} by HBASE-26476,we
* change it back to final.
*/
protected MemStore memstore;
// This stores directory in the filesystem.
private final HRegion region;
protected Configuration conf;
Expand Down Expand Up @@ -1222,6 +1225,14 @@ private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws I
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
/**
* NOTE:we should keep clearSnapshot method inside the write lock because clearSnapshot may
* close {@link DefaultMemStore#snapshot}, which may be used by
* {@link DefaultMemStore#getScanners}.
*/
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
Expand All @@ -1230,13 +1241,7 @@ private boolean updateStorefiles(List<HStoreFile> sfs, long snapshotId) throws I
// the lock.
this.lock.writeLock().unlock();
}
// We do not need to call clearSnapshot method inside the write lock.
// The clearSnapshot itself is thread safe, which can be called at the same time with other
// memstore operations expect snapshot and clearSnapshot. And for these two methods, in HRegion
// we can guarantee that there is only one onging flush, so they will be no race.
if (snapshotId > 0) {
this.memstore.clearSnapshot(snapshotId);
}

// notify to be called here - only in case of flushes
notifyChangedReadersObservers(sfs);
if (LOG.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public class MemStoreLABImpl implements MemStoreLAB {

// This flag is for closing this instance, its set when clearing snapshot of
// memstore
private volatile boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean(false);;
// This flag is for reclaiming chunks. Its set when putting chunks back to
// pool
private AtomicBoolean reclaimed = new AtomicBoolean(false);
private final AtomicBoolean reclaimed = new AtomicBoolean(false);
// Current count of open scanners which reading data from this MemStoreLAB
private final AtomicInteger openScannerCount = new AtomicInteger();

Expand Down Expand Up @@ -259,7 +259,9 @@ private static Cell createChunkCell(ByteBuffer buf, int offset, int len, int tag
*/
@Override
public void close() {
this.closed = true;
if (!this.closed.compareAndSet(false, true)) {
return;
}
// We could put back the chunks to pool for reusing only when there is no
// opening scanner which will read their data
int count = openScannerCount.get();
Expand All @@ -286,14 +288,15 @@ public void incScannerCount() {
@Override
public void decScannerCount() {
int count = this.openScannerCount.decrementAndGet();
if (this.closed && count == 0) {
if (this.closed.get() && count == 0) {
recycleChunks();
}
}

private void recycleChunks() {
if (reclaimed.compareAndSet(false, true)) {
chunkCreator.putbackChunks(chunks);
chunks.clear();
}
}

Expand Down Expand Up @@ -409,13 +412,21 @@ BlockingQueue<Chunk> getPooledChunks() {
return pooledChunks;
}

Integer getNumOfChunksReturnedToPool() {
Integer getNumOfChunksReturnedToPool(Set<Integer> chunksId) {
int i = 0;
for (Integer id : this.chunks) {
for (Integer id : chunksId) {
if (chunkCreator.isChunkInPool(id)) {
i++;
}
}
return i;
}

boolean isReclaimed() {
return reclaimed.get();
}

boolean isClosed() {
return closed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.OperationWithAttributes;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
Expand Down Expand Up @@ -1278,6 +1279,7 @@ public String getScanDetailsWithId(long scannerId) {
StringBuilder builder = new StringBuilder();
builder.append("table: ").append(scanner.getRegionInfo().getTable().getNameAsString());
builder.append(" region: ").append(scanner.getRegionInfo().getRegionNameAsString());
builder.append(" operation_id: ").append(scanner.getOperationId());
return builder.toString();
}

Expand All @@ -1290,6 +1292,12 @@ public String getScanDetailsWithRequest(ScanRequest request) {
StringBuilder builder = new StringBuilder();
builder.append("table: ").append(region.getRegionInfo().getTable().getNameAsString());
builder.append(" region: ").append(region.getRegionInfo().getRegionNameAsString());
for (NameBytesPair pair : request.getScan().getAttributeList()) {
if (OperationWithAttributes.ID_ATRIBUTE.equals(pair.getName())) {
builder.append(" operation_id: ").append(Bytes.toString(pair.getValue().toByteArray()));
break;
}
}
return builder.toString();
} catch (IOException ignored) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ public interface RegionScanner extends InternalScanner {
*/
int getBatch();

/**
* @return The Scanner's {@link org.apache.hadoop.hbase.client.Scan#ID_ATRIBUTE} value,
* or null if not set.
*/
default String getOperationId() {
return null;
}

/**
* Grab the next row's worth of values. This is a special internal method to be called from
* coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
private final long maxResultSize;
private final ScannerContext defaultScannerContext;
private final FilterWrapper filter;
private final String operationId;

private RegionServerServices rsServices;

Expand Down Expand Up @@ -121,6 +122,7 @@ private static boolean hasNonce(HRegion region, long nonce) {
defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build();
this.stopRow = scan.getStopRow();
this.includeStopRow = scan.includeStopRow();
this.operationId = scan.getId();

// synchronize on scannerReadPoints so that nobody calculates
// getSmallestReadPoint, before scannerReadPoints is updated.
Expand Down Expand Up @@ -215,6 +217,11 @@ public int getBatch() {
return this.defaultScannerContext.getBatchLimit();
}

@Override
public String getOperationId() {
return operationId;
}

/**
* Reset both the filter and the old filter.
* @throws IOException in case a filter raises an I/O exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4446,6 +4446,23 @@ public boolean isFamilyEssential(byte[] name) {
}
}

@Test
public void testScannerOperationId() throws IOException {
region = initHRegion(tableName, method, CONF, COLUMN_FAMILY_BYTES);
Scan scan = new Scan();
RegionScanner scanner = region.getScanner(scan);
assertNull(scanner.getOperationId());
scanner.close();

String operationId = "test_operation_id_0101";
scan = new Scan().setId(operationId);
scanner = region.getScanner(scan);
assertEquals(operationId, scanner.getOperationId());
scanner.close();

HBaseTestingUtil.closeRegionAndWAL(this.region);
}

/**
* Write an HFile block full with Cells whose qualifier that are identical between
* 0 and Short.MAX_VALUE. See HBASE-13329.
Expand Down
Loading

0 comments on commit b863b1e

Please sign in to comment.