Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport "HBASE-25779 HRegionServer#compactSplitThread should be private" to branch-2 #3203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public class HRegionServer extends Thread implements
private ReplicationSinkService replicationSinkHandler;

// Compactions
public CompactSplit compactSplitThread;
private CompactSplit compactSplitThread;

/**
* Map of regions currently being served by this region server. Key is the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,10 @@ private boolean flushRegion(final FlushRegionEntry fqe) {
LOG.warn("{} has too many store files({}); delaying flush up to {} ms",
region.getRegionInfo().getEncodedName(), getStoreFileCount(region),
this.blockingWaitTime);
if (!this.server.compactSplitThread.requestSplit(region)) {
final CompactSplit compactSplitThread = server.getCompactSplitThread();
if (!compactSplitThread.requestSplit(region)) {
try {
this.server.compactSplitThread.requestSystemCompaction(region,
compactSplitThread.requestSystemCompaction(region,
Thread.currentThread().getName());
} catch (IOException e) {
e = e instanceof RemoteException ?
Expand Down Expand Up @@ -624,16 +625,17 @@ private boolean flushRegion(HRegion region, boolean emergencyFlush,

tracker.beforeExecution();
lock.readLock().lock();
final CompactSplit compactSplitThread = server.getCompactSplitThread();
try {
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flushcache(families, false, tracker);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit().isPresent();
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,37 +234,26 @@ public long getTotalRowActionRequestCount() {

@Override
public int getSplitQueueSize() {
if (this.regionServer.compactSplitThread == null) {
return 0;
}
return this.regionServer.compactSplitThread.getSplitQueueSize();
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
return compactSplit == null ? 0 : compactSplit.getSplitQueueSize();
}

@Override
public int getCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue.
if (this.regionServer.compactSplitThread == null) {
return 0;
}
return this.regionServer.compactSplitThread.getCompactionQueueSize();
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
return compactSplit == null ? 0 : compactSplit.getCompactionQueueSize();
}

@Override
public int getSmallCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue.
if (this.regionServer.compactSplitThread == null) {
return 0;
}
return this.regionServer.compactSplitThread.getSmallCompactionQueueSize();
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
return compactSplit == null ? 0 : compactSplit.getSmallCompactionQueueSize();
}

@Override
public int getLargeCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue.
if (this.regionServer.compactSplitThread == null) {
return 0;
}
return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
final CompactSplit compactSplit = regionServer.getCompactSplitThread();
return compactSplit == null ? 0 : compactSplit.getLargeCompactionQueueSize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,20 @@ public static void dumpRowLock(HRegionServer hrs, PrintWriter out) {
}
}

public static void dumpQueue(HRegionServer hrs, PrintWriter out)
throws IOException {
if (hrs.compactSplitThread != null) {
public static void dumpQueue(HRegionServer hrs, PrintWriter out) {
final CompactSplit compactSplit = hrs.getCompactSplitThread();
if (compactSplit != null) {
// 1. Print out Compaction/Split Queue
out.println("Compaction/Split Queue summary: "
+ hrs.compactSplitThread.toString() );
out.println(hrs.compactSplitThread.dumpQueue());
out.println("Compaction/Split Queue summary: " + compactSplit);
out.println(compactSplit.dumpQueue());
}

if (hrs.getMemStoreFlusher() != null) {
final MemStoreFlusher memStoreFlusher = hrs.getMemStoreFlusher();
if (memStoreFlusher != null) {
// 2. Print out flush Queue
out.println("\nFlush Queue summary: " + hrs.getMemStoreFlusher().toString());
out.println(hrs.getMemStoreFlusher().dumpQueue());
out.println();
out.println("Flush Queue summary: " + memStoreFlusher);
out.println(memStoreFlusher.dumpQueue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1707,17 +1707,18 @@ public CompactRegionResponse compactRegion(final RpcController controller,
@Override
public CompactionSwitchResponse compactionSwitch(RpcController controller,
CompactionSwitchRequest request) throws ServiceException {
final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
try {
checkOpen();
requestCount.increment();
boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
boolean prevState = compactSplitThread.isCompactionsEnabled();
CompactionSwitchResponse response =
CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
if (prevState == request.getEnabled()) {
// passed in requested state is same as current state. No action required
return response;
}
regionServer.compactSplitThread.switchCompaction(request.getEnabled());
compactSplitThread.switchCompaction(request.getEnabled());
return response;
} catch (IOException ie) {
throw new ServiceException(ie);
Expand Down Expand Up @@ -1760,7 +1761,7 @@ public FlushRegionResponse flushRegion(final RpcController controller,
}
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,
regionServer.getCompactSplitThread().requestSystemCompaction(region,
"Compaction through user triggered flush");
}
builder.setFlushed(flushResult.isFlushSucceeded());
Expand Down Expand Up @@ -1876,17 +1877,18 @@ public ClearCompactionQueuesResponse clearCompactionQueues(RpcController control
ClearCompactionQueuesResponse.Builder respBuilder = ClearCompactionQueuesResponse.newBuilder();
requestCount.increment();
if (clearCompactionQueues.compareAndSet(false,true)) {
final CompactSplit compactSplitThread = regionServer.getCompactSplitThread();
try {
checkOpen();
regionServer.getRegionServerCoprocessorHost().preClearCompactionQueues();
for (String queueName : request.getQueueNameList()) {
LOG.debug("clear " + queueName + " compaction queue");
switch (queueName) {
case "long":
regionServer.compactSplitThread.clearLongCompactionsQueue();
compactSplitThread.clearLongCompactionsQueue();
break;
case "short":
regionServer.compactSplitThread.clearShortCompactionsQueue();
compactSplitThread.clearShortCompactionsQueue();
break;
default:
LOG.warn("Unknown queue name " + queueName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,11 @@ private void compactAndWait() throws IOException, InterruptedException {
final long maxWaitime = System.currentTimeMillis() + 500;
boolean cont;
do {
cont = rs.compactSplitThread.getCompactionQueueSize() == 0;
cont = rs.getCompactSplitThread().getCompactionQueueSize() == 0;
Threads.sleep(1);
} while (cont && System.currentTimeMillis() < maxWaitime);

while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
Threads.sleep(1);
}
LOG.debug("Compaction queue size reached 0, continuing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void loadTest() throws Exception {
// Wait until compaction completes
Threads.sleepWithoutInterrupt(5000);
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
while (rs.compactSplitThread.getCompactionQueueSize() > 0) {
while (rs.getCompactSplitThread().getCompactionQueueSize() > 0) {
Threads.sleep(50);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,39 +119,39 @@ public void testThreadPoolSizeTuning() throws Exception {
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);

// check initial configuration of thread pool sizes
assertEquals(3, regionServer.compactSplitThread.getLargeCompactionThreadNum());
assertEquals(4, regionServer.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(5, regionServer.compactSplitThread.getSplitThreadNum());
assertEquals(3, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
assertEquals(4, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(5, regionServer.getCompactSplitThread().getSplitThreadNum());

// change bigger configurations and do online update
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 4);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 5);
conf.setInt(CompactSplit.SPLIT_THREADS, 6);
try {
regionServer.compactSplitThread.onConfigurationChange(conf);
regionServer.getCompactSplitThread().onConfigurationChange(conf);
} catch (IllegalArgumentException iae) {
Assert.fail("Update bigger configuration failed!");
}

// check again after online update
assertEquals(4, regionServer.compactSplitThread.getLargeCompactionThreadNum());
assertEquals(5, regionServer.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(6, regionServer.compactSplitThread.getSplitThreadNum());
assertEquals(4, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
assertEquals(5, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(6, regionServer.getCompactSplitThread().getSplitThreadNum());

// change smaller configurations and do online update
conf.setInt(CompactSplit.LARGE_COMPACTION_THREADS, 2);
conf.setInt(CompactSplit.SMALL_COMPACTION_THREADS, 3);
conf.setInt(CompactSplit.SPLIT_THREADS, 4);
try {
regionServer.compactSplitThread.onConfigurationChange(conf);
regionServer.getCompactSplitThread().onConfigurationChange(conf);
} catch (IllegalArgumentException iae) {
Assert.fail("Update smaller configuration failed!");
}

// check again after online update
assertEquals(2, regionServer.compactSplitThread.getLargeCompactionThreadNum());
assertEquals(3, regionServer.compactSplitThread.getSmallCompactionThreadNum());
assertEquals(4, regionServer.compactSplitThread.getSplitThreadNum());
assertEquals(2, regionServer.getCompactSplitThread().getLargeCompactionThreadNum());
assertEquals(3, regionServer.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(4, regionServer.getCompactSplitThread().getSplitThreadNum());
} finally {
conn.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotNull;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -91,26 +91,25 @@ public static void tearDown() throws Exception {

/**
* Check if the number of compaction threads changes online
* @throws IOException
*/
@Test
public void testNumCompactionThreadsOnlineChange() throws IOException {
assertTrue(rs1.compactSplitThread != null);
public void testNumCompactionThreadsOnlineChange() {
assertNotNull(rs1.getCompactSplitThread());
int newNumSmallThreads =
rs1.compactSplitThread.getSmallCompactionThreadNum() + 1;
rs1.getCompactSplitThread().getSmallCompactionThreadNum() + 1;
int newNumLargeThreads =
rs1.compactSplitThread.getLargeCompactionThreadNum() + 1;
rs1.getCompactSplitThread().getLargeCompactionThreadNum() + 1;

conf.setInt("hbase.regionserver.thread.compaction.small",
newNumSmallThreads);
newNumSmallThreads);
conf.setInt("hbase.regionserver.thread.compaction.large",
newNumLargeThreads);
newNumLargeThreads);
rs1.getConfigurationManager().notifyAllObservers(conf);

assertEquals(newNumSmallThreads,
rs1.compactSplitThread.getSmallCompactionThreadNum());
rs1.getCompactSplitThread().getSmallCompactionThreadNum());
assertEquals(newNumLargeThreads,
rs1.compactSplitThread.getLargeCompactionThreadNum());
rs1.getCompactSplitThread().getLargeCompactionThreadNum());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void testThroughputTuning() throws Exception {
TEST_UTIL.waitTableAvailable(tableName);
HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName);
PressureAwareCompactionThroughputController throughputController =
(PressureAwareCompactionThroughputController) regionServer.compactSplitThread
(PressureAwareCompactionThroughputController) regionServer.getCompactSplitThread()
.getCompactionThroughputController();
assertEquals(10L * 1024 * 1024, throughputController.getMaxThroughput(), EPSILON);
Table table = conn.getTable(tableName);
Expand All @@ -234,9 +234,9 @@ public void testThroughputTuning() throws Exception {

conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
NoLimitThroughputController.class.getName());
regionServer.compactSplitThread.onConfigurationChange(conf);
regionServer.getCompactSplitThread().onConfigurationChange(conf);
assertTrue(throughputController.isStopped());
assertTrue(regionServer.compactSplitThread.getCompactionThroughputController()
assertTrue(regionServer.getCompactSplitThread().getCompactionThroughputController()
instanceof NoLimitThroughputController);
} finally {
conn.close();
Expand Down