Skip to content

Commit

Permalink
HBASE-26043 CompactionServer support compact TTL data (#3494)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
nyl3532016 authored Jul 26, 2021
1 parent 74fbc1b commit 85f0291
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public class CompactionThreadManager implements ThroughputControllerService {
this.server = server;
try {
this.rootDir = CommonFSUtils.getRootDir(this.conf);
this.tableDescriptors = new FSTableDescriptors(conf);
this.tableDescriptors = new FSTableDescriptors(CommonFSUtils.getCurrentFileSystem(conf),
CommonFSUtils.getRootDir(conf), true, false);
int largeThreads =
Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
Expand Down Expand Up @@ -213,7 +214,8 @@ Pair<HStore, Optional<CompactionContext>> selectCompaction(RegionInfo regionInfo
tableDescriptors.get(regionInfo.getTable());
HStore store = getStore(conf, server.getFileSystem(), rootDir,
tableDescriptors.get(regionInfo.getTable()), regionInfo, cfd.getNameAsString());

// handle TTL case
store.removeUnneededFiles(false);
// CompactedHFilesDischarger only run on regionserver, so compactionserver does not have
// opportunity to clean compacted file at that time, we clean compacted files here
compactionFilesCache.cleanupCompactedFiles(regionInfo, cfd,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ public Optional<CompactionContext> requestCompaction(int priority,
return Optional.empty();
}
// Before we do compaction, try to get rid of unneeded files to simplify things.
removeUnneededFiles();
removeUnneededFiles(true);

if (region.getRegionServerServices() != null
&& region.getRegionServerServices().isCompactionOffloadEnabled()
Expand Down Expand Up @@ -2063,7 +2063,7 @@ private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
}

private void removeUnneededFiles() throws IOException {
public void removeUnneededFiles(boolean writeWalRecord) throws IOException {
if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
return;
}
Expand Down Expand Up @@ -2092,7 +2092,9 @@ private void removeUnneededFiles() throws IOException {
}

Collection<HStoreFile> newFiles = Collections.emptyList(); // No new files.
writeCompactionWalRecord(delSfs, newFiles);
if (writeWalRecord) {
writeCompactionWalRecord(delSfs, newFiles);
}
replaceStoreFiles(delSfs, newFiles);
refreshStoreSizeAndTotalBytes();
LOG.info("Completed removal of " + delSfs.size() + " unnecessary (expired) file(s) in "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,50 @@ public void testCompactionOffloadTableDescriptor() throws Exception {
TEST_UTIL.compact(TABLENAME, false);
TEST_UTIL.waitFor(6000, () -> COMPACTION_SERVER.requestCount.sum() > 0);
}

@Test
public void testCompactionWithTTL() throws Exception {
TEST_UTIL.getAdmin().compactionSwitch(false, new ArrayList<>());
ColumnFamilyDescriptor cfd =
ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(FAMILY)).setTimeToLive(10).build();
TableDescriptor modifiedTableDescriptor = TableDescriptorBuilder.newBuilder(TABLENAME)
.setColumnFamily(cfd).setCompactionOffloadEnabled(true).build();
TEST_UTIL.getAdmin().modifyTable(modifiedTableDescriptor);
TEST_UTIL.waitTableAvailable(TABLENAME);
// generate hfile all kv are expired
doPutRecord(1, 500, true);
int kVCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
kVCount += hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
}
}
assertEquals(500, kVCount);
// generate hfile mixed with expired and valid KV
doPutRecord(1, 500, false);
Thread.sleep(10000);
doPutRecord(501, 1000, true);

TEST_UTIL.getAdmin().compactionSwitch(true, new ArrayList<>());
TEST_UTIL.compact(TABLENAME, true);
TEST_UTIL.waitFor(60000, () -> {
int hFileCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
hFileCount += region.getStore(Bytes.toBytes(FAMILY)).getStorefilesCount();
}
return hFileCount == 1;
});
kVCount = 0;
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLENAME)) {
for (HStoreFile hStoreFile : region.getStore(Bytes.toBytes(FAMILY)).getStorefiles()) {
kVCount += hStoreFile.getReader().getHFileReader().getTrailer().getEntryCount();
}
}
// To ensure do compaction on compaction server
TEST_UTIL.waitFor(60000, () -> COMPACTION_SERVER.requestCount.sum() > 0);

assertEquals(500, kVCount);
verifyRecord(1,500, false);
verifyRecord(501,1000, true);
}
}

0 comments on commit 85f0291

Please sign in to comment.