Skip to content

Commit

Permalink
HBASE-26069 Remove HStore.compactRecentForTestingAssumingDefaultPolic… (
Browse files Browse the repository at this point in the history
#3462)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
  • Loading branch information
Apache9 authored Jul 7, 2021
1 parent 29cd782 commit e65fc92
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 184 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -53,7 +52,6 @@
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -89,7 +87,6 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.querymatcher.ScanQueryMatcher;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
Expand Down Expand Up @@ -1747,64 +1744,6 @@ public void replayCompactionMarker(CompactionDescriptor compaction, boolean pick
}
}

/**
* This method tries to compact N recent files for testing.
* Note that because compacting "recent" files only makes sense for some policies,
* e.g. the default one, it assumes default policy is used. It doesn't use policy,
* but instead makes a compaction candidate list by itself.
* @param N Number of files.
*/
public void compactRecentForTestingAssumingDefaultPolicy(int N) throws IOException {
List<HStoreFile> filesToCompact;
boolean isMajor;

this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
filesToCompact = Lists.newArrayList(storeEngine.getStoreFileManager().getStorefiles());
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
HStoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = filesToCompact.indexOf(last);
Preconditions.checkArgument(idx != -1);
filesToCompact.subList(0, idx + 1).clear();
}
int count = filesToCompact.size();
if (N > count) {
throw new RuntimeException("Not enough files");
}

filesToCompact = filesToCompact.subList(count - N, count);
isMajor = (filesToCompact.size() == storeEngine.getStoreFileManager().getStorefileCount());
filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, storeEngine.getStoreFileManager()
.getStoreFileComparator());
}
} finally {
this.lock.readLock().unlock();
}

try {
// Ready to go. Have list of files to compact.
List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
.compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
HStoreFile sf = moveFileIntoPlace(newFile);
if (this.getCoprocessorHost() != null) {
this.getCoprocessorHost().postCompact(this, sf, null, null, null);
}
replaceStoreFiles(filesToCompact, Collections.singletonList(sf));
refreshStoreSizeAndTotalBytes();
}
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
}
}
}

@Override
public boolean hasReferences() {
// Grab the read lock here, because we need to ensure that: only when the atomic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
package org.apache.hadoop.hbase.regionserver.compactions;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
Expand Down Expand Up @@ -65,23 +62,6 @@ public List<Path> compact(final CompactionRequestImpl request,
return compact(request, defaultScannerFactory, writerFactory, throughputController, user);
}

/**
* Compact a list of files for testing. Creates a fake {@link CompactionRequestImpl} to pass to
* {@link #compact(CompactionRequestImpl, ThroughputController, User)};
* @param filesToCompact the files to compact. These are used as the compactionSelection for the
* generated {@link CompactionRequestImpl}.
* @param isMajor true to major compact (prune all deletes, max versions, etc)
* @return Product of compaction or an empty list if all cells expired or deleted and nothing \
* made it through the compaction.
* @throws IOException
*/
public List<Path> compactForTesting(Collection<HStoreFile> filesToCompact, boolean isMajor)
throws IOException {
CompactionRequestImpl cr = new CompactionRequestImpl(filesToCompact);
cr.setIsMajor(isMajor, isMajor);
return compact(cr, NoLimitThroughputController.INSTANCE, null);
}

@Override
protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
CompactionRequestImpl request) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.testclassification.IOTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
Expand Down Expand Up @@ -82,25 +81,18 @@ public class TestScannerSelectionUsingTTL {

public final int numFreshFiles, totalNumFiles;

/** Whether we are specifying the exact files to compact */
private final boolean explicitCompaction;

@Parameters
public static Collection<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (int numFreshFiles = 1; numFreshFiles <= 3; ++numFreshFiles) {
for (boolean explicitCompaction : new boolean[] { false, true }) {
params.add(new Object[] { numFreshFiles, explicitCompaction });
}
params.add(new Object[] { numFreshFiles });
}
return params;
}

public TestScannerSelectionUsingTTL(int numFreshFiles,
boolean explicitCompaction) {
public TestScannerSelectionUsingTTL(int numFreshFiles) {
this.numFreshFiles = numFreshFiles;
this.totalNumFiles = numFreshFiles + NUM_EXPIRED_FILES;
this.explicitCompaction = explicitCompaction;
}

@Test
Expand Down Expand Up @@ -152,13 +144,7 @@ public void testScannerSelection() throws IOException {
Set<String> accessedFiles = cache.getCachedFileNamesForTest();
LOG.debug("Files accessed during scan: " + accessedFiles);

// Exercise both compaction codepaths.
if (explicitCompaction) {
HStore store = region.getStore(FAMILY_BYTES);
store.compactRecentForTestingAssumingDefaultPolicy(totalNumFiles);
} else {
region.compact(false);
}
region.compact(false);

HBaseTestingUtility.closeRegionAndWAL(region);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test compaction framework and common functions
Expand All @@ -94,9 +96,12 @@ public class TestCompaction {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompaction.class);
HBaseClassTestRule.forClass(TestCompaction.class);

@Rule public TestName name = new TestName();
private static final Logger LOG = LoggerFactory.getLogger(TestCompaction.class);

@Rule
public TestName name = new TestName();
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected Configuration conf = UTIL.getConfiguration();

Expand Down Expand Up @@ -154,7 +159,6 @@ public void tearDown() throws Exception {
/**
* Verify that you can stop a long-running compaction
* (used during RS shutdown)
* @throws Exception
*/
@Test
public void testInterruptCompactionBySize() throws Exception {
Expand All @@ -180,7 +184,7 @@ public void testInterruptCompactionBySize() throws Exception {
}

HRegion spyR = spy(r);
doAnswer(new Answer() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
Expand Down Expand Up @@ -256,7 +260,7 @@ public void testInterruptCompactionByTime() throws Exception {
}

HRegion spyR = spy(r);
doAnswer(new Answer() {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
Expand Down Expand Up @@ -311,15 +315,14 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

private int count() throws IOException {
int count = 0;
for (HStoreFile f: this.r.stores.
get(COLUMN_FAMILY_TEXT).getStorefiles()) {
for (HStoreFile f : this.r.stores.get(COLUMN_FAMILY_TEXT).getStorefiles()) {
HFileScanner scanner = f.getReader().getScanner(false, false);
if (!scanner.seekTo()) {
continue;
}
do {
count++;
} while(scanner.next());
} while (scanner.next());
}
return count;
}
Expand All @@ -344,7 +347,8 @@ public void testCompactionWithCorruptResult() throws Exception {

Collection<HStoreFile> storeFiles = store.getStorefiles();
DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
tool.compactForTesting(storeFiles, false);
CompactionRequestImpl request = new CompactionRequestImpl(storeFiles);
tool.compact(request, NoLimitThroughputController.INSTANCE, null);

// Now lets corrupt the compacted file.
FileSystem fs = store.getFileSystem();
Expand All @@ -363,7 +367,7 @@ public void testCompactionWithCorruptResult() throws Exception {
// in the 'tmp' directory;
assertTrue(fs.exists(origPath));
assertFalse(fs.exists(dstPath));
System.out.println("testCompactionWithCorruptResult Passed");
LOG.info("testCompactionWithCorruptResult Passed");
return;
}
fail("testCompactionWithCorruptResult failed since no exception was" +
Expand Down Expand Up @@ -418,28 +422,27 @@ public void testCompactionFailure() throws Exception {
Mockito.when(mockRegion.checkSplit()).
thenThrow(new RuntimeException("Thrown intentionally by test!"));

MetricsRegionWrapper metricsWrapper = new MetricsRegionWrapperImpl(r);
try (MetricsRegionWrapperImpl metricsWrapper = new MetricsRegionWrapperImpl(r)) {

long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long preFailedCount = metricsWrapper.getNumCompactionsFailed();
long preCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long preFailedCount = metricsWrapper.getNumCompactionsFailed();

CountDownLatch latch = new CountDownLatch(1);
Tracker tracker = new Tracker(latch);
thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER,
tracker, null);
// wait for the latch to complete.
latch.await(120, TimeUnit.SECONDS);

// compaction should have completed and been marked as failed due to error in split request
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long postFailedCount = metricsWrapper.getNumCompactionsFailed();

assertTrue("Completed count should have increased (pre=" + preCompletedCount +
", post="+postCompletedCount+")",
postCompletedCount > preCompletedCount);
assertTrue("Failed count should have increased (pre=" + preFailedCount +
", post=" + postFailedCount + ")",
postFailedCount > preFailedCount);
CountDownLatch latch = new CountDownLatch(1);
Tracker tracker = new Tracker(latch);
thread.requestCompaction(mockRegion, store, "test custom comapction", PRIORITY_USER, tracker,
null);
// wait for the latch to complete.
latch.await(120, TimeUnit.SECONDS);

// compaction should have completed and been marked as failed due to error in split request
long postCompletedCount = metricsWrapper.getNumCompactionsCompleted();
long postFailedCount = metricsWrapper.getNumCompactionsFailed();

assertTrue("Completed count should have increased (pre=" + preCompletedCount + ", post=" +
postCompletedCount + ")", postCompletedCount > preCompletedCount);
assertTrue("Failed count should have increased (pre=" + preFailedCount + ", post=" +
postFailedCount + ")", postFailedCount > preFailedCount);
}
}

/**
Expand Down
Loading

0 comments on commit e65fc92

Please sign in to comment.