Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25972 Dual File Compaction #5545

Merged
merged 27 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ce955ba
HBASE-25972 Dual File Compactor
kadirozde Nov 27, 2023
ffbcceb
Refactored StoreEngine
kadirozde Dec 2, 2023
2f93abe
Added DualFileStoreFileManager
kadirozde Dec 3, 2023
893783a
Included the latest delete markers in the latest version files
kadirozde Dec 4, 2023
5b63d70
Integrated DualFileWriter and DualFileStoreFileManager with DefaultSt…
kadirozde Dec 11, 2023
61c6bf3
Minor changes and fixes for spotless check failures
kadirozde Dec 11, 2023
2e4325d
Removed DualFileStoreFileManager
kadirozde Dec 12, 2023
ccc4582
Made dual file compaction disabled by default
kadirozde Dec 15, 2023
63b1aa3
Changes for compaction performace improvement and delete use cases in…
kadirozde Dec 21, 2023
6efd551
Handled the new version behavior for delete markers
kadirozde Dec 28, 2023
0fdee04
Changes for the review comments by Viraj
kadirozde Jan 5, 2024
e65141a
Further changes for review comments by Viraj
kadirozde Jan 6, 2024
8288443
Java doc comment edit
kadirozde Jan 13, 2024
558253e
Fixed remaining minor checkstyle warnings
kadirozde Feb 6, 2024
55fc059
Bump up minFilesToCompact by one when DualFileWriter is enabled
kadirozde Feb 13, 2024
2512638
Moved the dual file writing to StoreFileWriter
kadirozde Mar 14, 2024
266b4c3
Fixed the test failure due to a log message
kadirozde Mar 14, 2024
3dd18ec
Test code minor fix and edits
kadirozde Mar 15, 2024
561ad7d
Historical files are generated only with default store engine and def…
kadirozde Mar 15, 2024
1e39455
Simplified the test code and fix some issues
kadirozde Mar 16, 2024
3d25055
Changes for review comments
kadirozde Mar 30, 2024
6632743
Added warning logs
kadirozde Mar 31, 2024
975cf3a
Removed public trackTimestamps() from StoreFileWriter
kadirozde Apr 20, 2024
ed61712
Changes for consistent view while getting storefile list
kadirozde Apr 24, 2024
99e791e
Set liveSoteFile to null only when live file tracking is disabled
kadirozde May 16, 2024
a5c4e29
Changes for spotless
kadirozde May 16, 2024
29218e6
Use assertTrue instead of assert
kadirozde May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,11 @@ public class PerformanceEvaluation extends Configured implements Tool {
addCommandDescriptor(RandomScanWithRange10000Test.class, "scanRange10000",
"Run random seek scan with both start and stop row (max 10000 rows)");
addCommandDescriptor(RandomWriteTest.class, "randomWrite", "Run random write test");
addCommandDescriptor(RandomDeleteTest.class, "randomDelete", "Run random delete test");
addCommandDescriptor(SequentialReadTest.class, "sequentialRead", "Run sequential read test");
addCommandDescriptor(SequentialWriteTest.class, "sequentialWrite", "Run sequential write test");
addCommandDescriptor(SequentialDeleteTest.class, "sequentialDelete",
"Run sequential delete test");
addCommandDescriptor(MetaWriteTest.class, "metaWrite",
"Populate meta table;used with 1 thread; to be cleaned up by cleanMeta");
addCommandDescriptor(ScanTest.class, "scan", "Run scan test (read every row)");
Expand Down Expand Up @@ -352,7 +355,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
boolean needsDelete = false, exists = admin.tableExists(tableName);
boolean isReadCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("read")
|| opts.cmdName.toLowerCase(Locale.ROOT).contains("scan");
if (!exists && isReadCmd) {
boolean isDeleteCmd = opts.cmdName.toLowerCase(Locale.ROOT).contains("delete");
if (!exists && (isReadCmd || isDeleteCmd)) {
throw new IllegalStateException(
"Must specify an existing table for read commands. Run a write command first.");
}
Expand All @@ -367,7 +371,8 @@ static boolean checkTable(Admin admin, TestOptions opts) throws IOException {
&& opts.presplitRegions != admin.getRegions(tableName).size())
|| (!isReadCmd && desc != null
&& !StringUtils.equals(desc.getRegionSplitPolicyClassName(), opts.splitPolicy))
|| (!isReadCmd && desc != null && desc.getRegionReplication() != opts.replicas)
|| (!(isReadCmd || isDeleteCmd) && desc != null
&& desc.getRegionReplication() != opts.replicas)
|| (desc != null && desc.getColumnFamilyCount() != opts.families)
) {
needsDelete = true;
Expand Down Expand Up @@ -2071,6 +2076,18 @@ protected byte[] generateRow(final int i) {

}

static class RandomDeleteTest extends SequentialDeleteTest {
RandomDeleteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}

@Override
protected byte[] generateRow(final int i) {
return getRandomRow(this.rand, opts.totalRows);
}

}

static class ScanTest extends TableTest {
private ResultScanner testScanner;

Expand Down Expand Up @@ -2406,6 +2423,34 @@ boolean testRow(final int i, final long startTime) throws IOException {
}
}

static class SequentialDeleteTest extends BufferedMutatorTest {

SequentialDeleteTest(Connection con, TestOptions options, Status status) {
super(con, options, status);
}

protected byte[] generateRow(final int i) {
return format(i);
}

@Override
boolean testRow(final int i, final long startTime) throws IOException {
byte[] row = generateRow(i);
Delete delete = new Delete(row);
for (int family = 0; family < opts.families; family++) {
byte[] familyName = Bytes.toBytes(FAMILY_NAME_BASE + family);
delete.addFamily(familyName);
}
delete.setDurability(opts.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
if (opts.autoFlush) {
table.delete(delete);
} else {
mutator.mutate(delete);
}
return true;
}
}

/*
* Insert fake regions into meta table with contiguous split keys.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ private void printMeta(HFile.Reader reader, Map<byte[], byte[]> fileInfo) throws
Bytes.equals(e.getKey(), HStoreFile.MAJOR_COMPACTION_KEY)
|| Bytes.equals(e.getKey(), HFileInfo.TAGS_COMPRESSED)
|| Bytes.equals(e.getKey(), HStoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY)
|| Bytes.equals(e.getKey(), HStoreFile.HISTORICAL_KEY)
) {
out.println(Bytes.toBoolean(e.getValue()));
} else if (Bytes.equals(e.getKey(), HFileInfo.LASTKEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private boolean isCompactedFile(FileStatus file, HStore store) {
}

private boolean isActiveStorefile(FileStatus file, HStore store) {
return store.getStoreEngine().getStoreFileManager().getStorefiles().stream()
return store.getStoreEngine().getStoreFileManager().getStoreFiles().stream()
.anyMatch(sf -> sf.getPath().equals(file.getPath()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -34,4 +35,14 @@ public interface CellSink {
* @param cell the cell to be added
*/
void append(Cell cell) throws IOException;

/**
* Append the given (possibly partial) list of cells of a row
* @param cellList the cell list to be added
*/
default void appendAll(List<Cell> cellList) throws IOException {
for (Cell cell : cellList) {
append(cell);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DateTieredStoreEngine extends StoreEngine<DefaultStoreFlusher,
DateTieredCompactionPolicy, DateTieredCompactor, DefaultStoreFileManager> {
@Override
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
return compactionPolicy.needsCompaction(storeFileManager.getStorefiles(), filesCompacting);
return compactionPolicy.needsCompaction(storeFileManager.getStoreFiles(), filesCompacting);
}

@Override
Expand All @@ -65,14 +65,14 @@ private final class DateTieredCompactionContext extends CompactionContext {

@Override
public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(),
filesCompacting);
}

@Override
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting,
isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class DefaultStoreEngine extends StoreEngine<DefaultStoreFlusher, RatioBa

@Override
public boolean needsCompaction(List<HStoreFile> filesCompacting) {
return compactionPolicy.needsCompaction(this.storeFileManager.getStorefiles(), filesCompacting);
return compactionPolicy.needsCompaction(this.storeFileManager.getStoreFiles(), filesCompacting);
}

@Override
Expand Down Expand Up @@ -111,7 +111,7 @@ private class DefaultCompactionContext extends CompactionContext {
@Override
public boolean select(List<HStoreFile> filesCompacting, boolean isUserCompaction,
boolean mayUseOffPeak, boolean forceMajor) throws IOException {
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(), filesCompacting,
request = compactionPolicy.selectCompaction(storeFileManager.getStoreFiles(), filesCompacting,
isUserCompaction, mayUseOffPeak, forceMajor);
return request != null;
}
Expand All @@ -124,7 +124,7 @@ public List<Path> compact(ThroughputController throughputController, User user)

@Override
public List<HStoreFile> preSelect(List<HStoreFile> filesCompacting) {
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStorefiles(),
return compactionPolicy.preSelectCompactionForCoprocessor(storeFileManager.getStoreFiles(),
filesCompacting);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
*/
package org.apache.hadoop.hbase.regionserver;

import static org.apache.hadoop.hbase.regionserver.StoreFileWriter.shouldEnableHistoricalCompactionFiles;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -48,36 +52,71 @@ class DefaultStoreFileManager implements StoreFileManager {
private final CompactionConfiguration comConf;
private final int blockingFileCount;
private final Comparator<HStoreFile> storeFileComparator;
/**
* List of store files inside this store. This is an immutable list that is atomically replaced
* when its contents change.
*/
private volatile ImmutableList<HStoreFile> storefiles = ImmutableList.of();

static class StoreFileList {
/**
* List of store files inside this store. This is an immutable list that is atomically replaced
* when its contents change.
*/
final ImmutableList<HStoreFile> all;
/**
* List of store files that include the latest cells inside this store. This is an immutable
* list that is atomically replaced when its contents change.
*/
@Nullable
final ImmutableList<HStoreFile> live;

StoreFileList(ImmutableList<HStoreFile> storeFiles, ImmutableList<HStoreFile> liveStoreFiles) {
this.all = storeFiles;
this.live = liveStoreFiles;
}
}

private volatile StoreFileList storeFiles;

/**
* List of compacted files inside this store that needs to be excluded in reads because further
* new reads will be using only the newly created files out of compaction. These compacted files
* will be deleted/cleared once all the existing readers on these compacted files are done.
*/
private volatile ImmutableList<HStoreFile> compactedfiles = ImmutableList.of();
private final boolean enableLiveFileTracking;

public DefaultStoreFileManager(CellComparator cellComparator,
Comparator<HStoreFile> storeFileComparator, Configuration conf,
CompactionConfiguration comConf) {
this.cellComparator = cellComparator;
this.storeFileComparator = storeFileComparator;
this.comConf = comConf;
this.blockingFileCount =
blockingFileCount =
conf.getInt(HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
enableLiveFileTracking = shouldEnableHistoricalCompactionFiles(conf);
storeFiles =
new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
}

private List<HStoreFile> getLiveFiles(Collection<HStoreFile> storeFiles) throws IOException {
List<HStoreFile> liveFiles = new ArrayList<>(storeFiles.size());
for (HStoreFile file : storeFiles) {
file.initReader();
if (!file.isHistorical()) {
liveFiles.add(file);
}
}
return liveFiles;
}

@Override
public void loadFiles(List<HStoreFile> storeFiles) {
this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, storeFiles);
public void loadFiles(List<HStoreFile> storeFiles) throws IOException {
this.storeFiles = new StoreFileList(ImmutableList.sortedCopyOf(storeFileComparator, storeFiles),
enableLiveFileTracking
? ImmutableList.sortedCopyOf(storeFileComparator, getLiveFiles(storeFiles))
: null);
}

@Override
public final Collection<HStoreFile> getStorefiles() {
return storefiles;
public final Collection<HStoreFile> getStoreFiles() {
return storeFiles.all;
}

@Override
Expand All @@ -86,15 +125,20 @@ public Collection<HStoreFile> getCompactedfiles() {
}

@Override
public void insertNewFiles(Collection<HStoreFile> sfs) {
this.storefiles =
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(this.storefiles, sfs));
public void insertNewFiles(Collection<HStoreFile> sfs) throws IOException {
storeFiles = new StoreFileList(
ImmutableList.sortedCopyOf(storeFileComparator, Iterables.concat(storeFiles.all, sfs)),
enableLiveFileTracking
? ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(storeFiles.live, getLiveFiles(sfs)))
: null);
}

@Override
public ImmutableCollection<HStoreFile> clearFiles() {
ImmutableList<HStoreFile> result = storefiles;
storefiles = ImmutableList.of();
ImmutableList<HStoreFile> result = storeFiles.all;
storeFiles =
new StoreFileList(ImmutableList.of(), enableLiveFileTracking ? ImmutableList.of() : null);
return result;
}

Expand All @@ -107,7 +151,7 @@ public Collection<HStoreFile> clearCompactedFiles() {

@Override
public final int getStorefileCount() {
return storefiles.size();
return storeFiles.all.size();
}

@Override
Expand All @@ -117,28 +161,38 @@ public final int getCompactedFilesCount() {

@Override
public void addCompactionResults(Collection<HStoreFile> newCompactedfiles,
Collection<HStoreFile> results) {
this.storefiles = ImmutableList.sortedCopyOf(storeFileComparator, Iterables
.concat(Iterables.filter(storefiles, sf -> !newCompactedfiles.contains(sf)), results));
Collection<HStoreFile> results) throws IOException {
ImmutableList<HStoreFile> liveStoreFiles = null;
if (enableLiveFileTracking) {
liveStoreFiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(Iterables.filter(storeFiles.live, sf -> !newCompactedfiles.contains(sf)),
getLiveFiles(results)));
}
storeFiles =
new StoreFileList(
ImmutableList
.sortedCopyOf(storeFileComparator,
Iterables.concat(
Iterables.filter(storeFiles.all, sf -> !newCompactedfiles.contains(sf)), results)),
liveStoreFiles);
// Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
// Let a background thread close the actual reader on these compacted files and also
// ensure to evict the blocks from block cache so that they are no longer in
// cache
newCompactedfiles.forEach(HStoreFile::markCompactedAway);
this.compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(this.compactedfiles, newCompactedfiles));
compactedfiles = ImmutableList.sortedCopyOf(storeFileComparator,
Iterables.concat(compactedfiles, newCompactedfiles));
}

@Override
public void removeCompactedFiles(Collection<HStoreFile> removedCompactedfiles) {
this.compactedfiles =
this.compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
compactedfiles = compactedfiles.stream().filter(sf -> !removedCompactedfiles.contains(sf))
.sorted(storeFileComparator).collect(ImmutableList.toImmutableList());
}

@Override
public final Iterator<HStoreFile> getCandidateFilesForRowKeyBefore(KeyValue targetKey) {
return this.storefiles.reverse().iterator();
return storeFiles.all.reverse().iterator();
}

@Override
Expand All @@ -153,25 +207,28 @@ public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(

@Override
public final Optional<byte[]> getSplitPoint() throws IOException {
return StoreUtils.getSplitPoint(storefiles, cellComparator);
return StoreUtils.getSplitPoint(storeFiles.all, cellComparator);
}

@Override
public final Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow) {
public Collection<HStoreFile> getFilesForScan(byte[] startRow, boolean includeStartRow,
byte[] stopRow, boolean includeStopRow, boolean onlyLatestVersion) {
if (onlyLatestVersion && enableLiveFileTracking) {
return storeFiles.live;
}
// We cannot provide any useful input and already have the files sorted by seqNum.
return getStorefiles();
return getStoreFiles();
}

@Override
public int getStoreCompactionPriority() {
int priority = blockingFileCount - storefiles.size();
int priority = blockingFileCount - storeFiles.all.size();
return (priority == HStore.PRIORITY_USER) ? priority + 1 : priority;
}

@Override
public Collection<HStoreFile> getUnneededFiles(long maxTs, List<HStoreFile> filesCompacting) {
ImmutableList<HStoreFile> files = storefiles;
ImmutableList<HStoreFile> files = storeFiles.all;
// 1) We can never get rid of the last file which has the maximum seqid.
// 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> {
Expand Down
Loading